Daemonized servers are now always launched with popen, and not directly invoked in...
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
10 import getpass
11 import cPickle
12 import sys
13 import time
14 import tempfile
15 import shutil
16 import functools
17 import os
18
19 # PROTOCOL REPLIES
20 OK = 0
21 ERROR = 1
22
23 # PROTOCOL INSTRUCTION MESSAGES
24 XML = 2 
25 TRACE   = 4
26 FINISHED    = 5
27 START   = 6
28 STOP    = 7
29 SHUTDOWN    = 8
30 CONFIGURE   = 9
31 CREATE      = 10
32 CREATE_SET  = 11
33 FACTORY_SET = 12
34 CONNECT     = 13
35 CROSS_CONNECT   = 14
36 ADD_TRACE   = 15
37 ADD_ADDRESS = 16
38 ADD_ROUTE   = 17
39 DO_SETUP    = 18
40 DO_CREATE   = 19
41 DO_CONNECT_INIT = 20
42 DO_CONFIGURE    = 21
43 DO_CROSS_CONNECT_INIT   = 22
44 GET = 23
45 SET = 24
46 ACTION  = 25
47 STATUS  = 26
48 GUIDS  = 27
49 GET_ROUTE = 28
50 GET_ADDRESS = 29
51 RECOVER = 30
52 DO_PRECONFIGURE     = 31
53 GET_ATTRIBUTE_LIST  = 32
54 DO_CONNECT_COMPL    = 33
55 DO_CROSS_CONNECT_COMPL  = 34
56 TESTBED_ID  = 35
57 TESTBED_VERSION  = 36
58 DO_PRESTART = 37
59 GET_FACTORY_ID = 38
60 GET_TESTBED_ID = 39
61 GET_TESTBED_VERSION = 40
62 TRACES_INFO = 41
63 EXEC_XML = 42
64 TESTBED_STATUS  = 43
65 STARTED_TIME  = 44
66 STOPPED_TIME  = 45
67
68 instruction_text = dict({
69     OK:     "OK",
70     ERROR:  "ERROR",
71     XML:    "XML",
72     EXEC_XML:    "EXEC_XML",
73     TRACE:  "TRACE",
74     FINISHED:   "FINISHED",
75     START:  "START",
76     STOP:   "STOP",
77     RECOVER: "RECOVER",
78     SHUTDOWN:   "SHUTDOWN",
79     CONFIGURE:  "CONFIGURE",
80     CREATE: "CREATE",
81     CREATE_SET: "CREATE_SET",
82     FACTORY_SET:    "FACTORY_SET",
83     CONNECT:    "CONNECT",
84     CROSS_CONNECT: "CROSS_CONNECT",
85     ADD_TRACE:  "ADD_TRACE",
86     ADD_ADDRESS:    "ADD_ADDRESS",
87     ADD_ROUTE:  "ADD_ROUTE",
88     DO_SETUP:   "DO_SETUP",
89     DO_CREATE:  "DO_CREATE",
90     DO_CONNECT_INIT: "DO_CONNECT_INIT",
91     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
92     DO_CONFIGURE:   "DO_CONFIGURE",
93     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
94     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
95     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
96     GET:    "GET",
97     SET:    "SET",
98     GET_ROUTE: "GET_ROUTE",
99     GET_ADDRESS: "GET_ADDRESS",
100     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
101     GET_FACTORY_ID: "GET_FACTORY_ID",
102     GET_TESTBED_ID: "GET_TESTBED_ID",
103     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
104     ACTION: "ACTION",
105     STATUS: "STATUS",
106     GUIDS:  "GUIDS",
107     TESTBED_ID: "TESTBED_ID",
108     TESTBED_VERSION: "TESTBED_VERSION",
109     TRACES_INFO: "TRACES_INFO",
110     STARTED_TIME: "STARTED_TIME",
111     STOPPED_TIME: "STOPPED_TIME",
112
113     })
114
115 def log_msg(server, params):
116     try:
117         instr = int(params[0])
118         instr_txt = instruction_text[instr]
119         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
120             instr_txt, ", ".join(map(str, params[1:]))))
121     except:
122         # don't die for logging
123         pass
124
125 def log_reply(server, reply):
126     try:
127         res = reply.split("|")
128         code = int(res[0])
129         code_txt = instruction_text[code]
130         try:
131             txt = base64.b64decode(res[1])
132         except:
133             txt = res[1]
134         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
135                 code_txt, txt))
136     except:
137         # don't die for logging
138         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
139                 reply))
140         pass
141
142 def to_server_log_level(log_level):
143     return (
144         server.DEBUG_LEVEL
145             if log_level == DC.DEBUG_LEVEL 
146         else server.ERROR_LEVEL
147     )
148
149 def get_access_config_params(access_config):
150     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
151     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
152     log_level = to_server_log_level(log_level)
153     user = host = port = agent = key = sudo = None
154     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
155     environment_setup = (
156         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
157         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
158         else ""
159     )
160     user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
161     host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
162     port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
163     agent = access_config.get_attribute_value(DC.USE_AGENT)
164     sudo = access_config.get_attribute_value(DC.USE_SUDO)
165     key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
166     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
167     return (root_dir, log_level, communication, user, host, port, key, agent,
168             sudo, environment_setup)
169
170 class AccessConfiguration(AttributesMap):
171     def __init__(self, params = None):
172         super(AccessConfiguration, self).__init__()
173         
174         from nepi.core.metadata import Metadata
175         
176         for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
177             self.add_attribute(**attr_info)
178
179         if params:
180             for attr_name, attr_value in params.iteritems():
181                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
182                 attr_value = parser(attr_value)
183                 self.set_attribute_value(attr_name, attr_value)
184
185 class TempDir(object):
186     def __init__(self):
187         self.path = tempfile.mkdtemp()
188     
189     def __del__(self):
190         shutil.rmtree(self.path)
191
192 class PermDir(object):
193     def __init__(self, path):
194         self.path = path
195
196 def create_experiment_controller(xml, access_config = None):
197     mode = None if not access_config \
198             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
199     launch = True if not access_config \
200             else not access_config.get_attribute_value(DC.RECOVER)
201     if not mode or mode == DC.MODE_SINGLE_PROCESS:
202         from nepi.core.execute import ExperimentController
203         
204         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
205             root_dir = TempDir()
206         else:
207             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
208         controller = ExperimentController(xml, root_dir.path)
209         
210         # inject reference to temporary dir, so that it gets cleaned
211         # up at destruction time.
212         controller._tempdir = root_dir
213         
214         if not launch:
215             # try to recover
216             controller.recover()
217         
218         return controller
219     elif mode == DC.MODE_DAEMON:
220         (root_dir, log_level, communication, user, host, port, key, agent,
221                 sudo, environment_setup) = get_access_config_params(access_config)
222         try:
223             return ExperimentControllerProxy(root_dir, log_level,
224                 experiment_xml = xml,
225                 communication = communication,
226                 host = host, 
227                 port = port, 
228                 user = user, 
229                 ident_key = key,
230                 agent = agent, 
231                 sudo = sudo, 
232                 launch = launch,
233                 environment_setup = environment_setup)
234         except:
235             if not launch:
236                 # Maybe controller died, recover from persisted testbed information if possible
237                 controller = ExperimentControllerProxy(root_dir, log_level,
238                     experiment_xml = xml,
239                     communication = communication,
240                     host = host, 
241                     port = port, 
242                     user = user, 
243                     ident_key = key,
244                     agent = agent, 
245                     sudo = sudo, 
246                     launch = True,
247                     environment_setup = environment_setup)
248                 controller.recover()
249                 return controller
250             else:
251                 raise
252     raise RuntimeError("Unsupported access configuration '%s'" % mode)
253
254 def create_testbed_controller(testbed_id, testbed_version, access_config):
255     mode = None if not access_config \
256             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
257     launch = True if not access_config \
258             else not access_config.get_attribute_value(DC.RECOVER)
259     if not mode or mode == DC.MODE_SINGLE_PROCESS:
260         if not launch:
261             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
262         return  _build_testbed_controller(testbed_id, testbed_version)
263     elif mode == DC.MODE_DAEMON:
264         (root_dir, log_level, communication, user, host, port, key, agent,
265                 sudo, environment_setup) = get_access_config_params(access_config)
266         return TestbedControllerProxy(root_dir, log_level, 
267                 testbed_id = testbed_id, 
268                 testbed_version = testbed_version,
269                 communication = communication,
270                 host = host, 
271                 port = port, 
272                 ident_key = key,
273                 user = user, 
274                 agent = agent, 
275                 sudo = sudo, 
276                 launch = launch,
277                 environment_setup = environment_setup)
278     raise RuntimeError("Unsupported access configuration '%s'" % mode)
279
280 def _build_testbed_controller(testbed_id, testbed_version):
281     mod_name = nepi.util.environ.find_testbed(testbed_id)
282     
283     if not mod_name in sys.modules:
284         try:
285             __import__(mod_name)
286         except ImportError:
287             raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
288     
289     module = sys.modules[mod_name]
290     tc = module.TestbedController()
291     if tc.testbed_version != testbed_version:
292         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
293                 (testbed_id, testbed_version, tc.testbed_version))
294     return tc
295
296 # Just a namespace class
297 class Marshalling:
298     class Decoders:
299         @staticmethod
300         def pickled_data(sdata):
301             return cPickle.loads(base64.b64decode(sdata))
302         
303         @staticmethod
304         def base64_data(sdata):
305             return base64.b64decode(sdata)
306         
307         @staticmethod
308         def nullint(sdata):
309             return None if sdata == "None" else int(sdata)
310
311         @staticmethod
312         def bool(sdata):
313             return sdata == 'True'
314         
315     class Encoders:
316         @staticmethod
317         def pickled_data(data):
318             return base64.b64encode(cPickle.dumps(data))
319         
320         @staticmethod
321         def base64_data(data):
322             return base64.b64encode(data)
323         
324         @staticmethod
325         def nullint(data):
326             return "None" if data is None else int(data)
327         
328         @staticmethod
329         def bool(data):
330             return str(bool(data))
331            
332     # import into Marshalling all the decoders
333     # they act as types
334     locals().update([
335         (typname, typ)
336         for typname, typ in vars(Decoders).iteritems()
337         if not typname.startswith('_')
338     ])
339
340     _TYPE_ENCODERS = dict([
341         # id(type) -> (<encoding_function>, <formatting_string>)
342         (typname, (getattr(Encoders,typname),"%s"))
343         for typname in vars(Decoders)
344         if not typname.startswith('_')
345            and hasattr(Encoders,typname)
346     ])
347
348     # Builtins
349     _TYPE_ENCODERS["float"] = (float, "%r")
350     _TYPE_ENCODERS["int"] = (int, "%d")
351     _TYPE_ENCODERS["long"] = (int, "%d")
352     _TYPE_ENCODERS["str"] = (str, "%s")
353     _TYPE_ENCODERS["unicode"] = (str, "%s")
354     
355     # Generic encoder
356     _TYPE_ENCODERS[None] = (str, "%s")
357     
358     @staticmethod
359     def args(*types):
360         """
361         Decorator that converts the given function into one that takes
362         a single "params" list, with each parameter marshalled according
363         to the given factory callable (type constructors are accepted).
364         
365         The first argument (self) is left untouched.
366         
367         eg:
368         
369         @Marshalling.args(int,int,str,base64_data)
370         def somefunc(self, someint, otherint, somestr, someb64):
371            return someretval
372         """
373         def decor(f):
374             @functools.wraps(f)
375             def rv(self, params):
376                 return f(self, *[ ctor(val)
377                                   for ctor,val in zip(types, params[1:]) ])
378             
379             rv._argtypes = types
380             
381             # Derive type encoders by looking up types in _TYPE_ENCODERS
382             # make_proxy will use it to encode arguments in command strings
383             argencoders = []
384             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
385             for typ in types:
386                 if typ.__name__ in TYPE_ENCODERS:
387                     argencoders.append(TYPE_ENCODERS[typ.__name__])
388                 else:
389                     # generic encoder
390                     argencoders.append(TYPE_ENCODERS[None])
391             
392             rv._argencoders = tuple(argencoders)
393             
394             rv._retval = getattr(f, '_retval', None)
395             return rv
396         return decor
397
398     @staticmethod
399     def retval(typ=Decoders.base64_data):
400         """
401         Decorator that converts the given function into one that 
402         returns a properly encoded return string, given that the undecorated
403         function returns suitable input for the encoding function.
404         
405         The optional typ argument specifies a type.
406         For the default of base64_data, return values should be strings.
407         The return value of the encoding method should be a string always.
408         
409         eg:
410         
411         @Marshalling.args(int,int,str,base64_data)
412         @Marshalling.retval(str)
413         def somefunc(self, someint, otherint, somestr, someb64):
414            return someint
415         """
416         encode, fmt = Marshalling._TYPE_ENCODERS.get(
417             typ.__name__,
418             Marshalling._TYPE_ENCODERS[None])
419         fmt = "%d|"+fmt
420         
421         def decor(f):
422             @functools.wraps(f)
423             def rv(self, *p, **kw):
424                 data = f(self, *p, **kw)
425                 return fmt % (
426                     OK,
427                     encode(data)
428                 )
429             rv._retval = typ
430             rv._argtypes = getattr(f, '_argtypes', None)
431             rv._argencoders = getattr(f, '_argencoders', None)
432             return rv
433         return decor
434     
435     @staticmethod
436     def retvoid(f):
437         """
438         Decorator that converts the given function into one that 
439         always return an encoded empty string.
440         
441         Useful for null-returning functions.
442         """
443         OKRV = "%d|" % (OK,)
444         
445         @functools.wraps(f)
446         def rv(self, *p, **kw):
447             f(self, *p, **kw)
448             return OKRV
449         
450         rv._retval = None
451         rv._argtypes = getattr(f, '_argtypes', None)
452         rv._argencoders = getattr(f, '_argencoders', None)
453         return rv
454     
455     @staticmethod
456     def handles(whichcommand):
457         """
458         Associates the method with a given command code for servers.
459         It should always be the topmost decorator.
460         """
461         def decor(f):
462             f._handles_command = whichcommand
463             return f
464         return decor
465
466 class BaseServer(server.Server):
467     def reply_action(self, msg):
468         if not msg:
469             result = base64.b64encode("Invalid command line")
470             reply = "%d|%s" % (ERROR, result)
471         else:
472             params = msg.split("|")
473             instruction = int(params[0])
474             log_msg(self, params)
475             try:
476                 for mname,meth in vars(self.__class__).iteritems():
477                     if not mname.startswith('_'):
478                         cmd = getattr(meth, '_handles_command', None)
479                         if cmd == instruction:
480                             meth = getattr(self, mname)
481                             reply = meth(params)
482                             break
483                 else:
484                     error = "Invalid instruction %s" % instruction
485                     self.log_error(error)
486                     result = base64.b64encode(error)
487                     reply = "%d|%s" % (ERROR, result)
488             except:
489                 error = self.log_error()
490                 result = base64.b64encode(error)
491                 reply = "%d|%s" % (ERROR, result)
492         log_reply(self, reply)
493         return reply
494
495 class TestbedControllerServer(BaseServer):
496     def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
497         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
498             environment_setup = environment_setup )
499         self._testbed_id = testbed_id
500         self._testbed_version = testbed_version
501         self._testbed = None
502
503     def post_daemonize(self):
504         self._testbed = _build_testbed_controller(self._testbed_id, 
505                 self._testbed_version)
506
507     @Marshalling.handles(GUIDS)
508     @Marshalling.args()
509     @Marshalling.retval( Marshalling.pickled_data )
510     def guids(self):
511         return self._testbed.guids
512
513     @Marshalling.handles(TESTBED_ID)
514     @Marshalling.args()
515     @Marshalling.retval()
516     def testbed_id(self):
517         return str(self._testbed.testbed_id)
518
519     @Marshalling.handles(TESTBED_VERSION)
520     @Marshalling.args()
521     @Marshalling.retval()
522     def testbed_version(self):
523         return str(self._testbed.testbed_version)
524
525     @Marshalling.handles(CREATE)
526     @Marshalling.args(int, str)
527     @Marshalling.retvoid
528     def defer_create(self, guid, factory_id):
529         self._testbed.defer_create(guid, factory_id)
530
531     @Marshalling.handles(TRACE)
532     @Marshalling.args(int, str, Marshalling.base64_data)
533     @Marshalling.retval()
534     def trace(self, guid, trace_id, attribute):
535         return self._testbed.trace(guid, trace_id, attribute)
536
537     @Marshalling.handles(TRACES_INFO)
538     @Marshalling.args()
539     @Marshalling.retval( Marshalling.pickled_data )
540     def traces_info(self):
541         return self._testbed.traces_info()
542
543     @Marshalling.handles(START)
544     @Marshalling.args()
545     @Marshalling.retvoid
546     def start(self):
547         self._testbed.start()
548
549     @Marshalling.handles(STOP)
550     @Marshalling.args()
551     @Marshalling.retvoid
552     def stop(self):
553         self._testbed.stop()
554
555     @Marshalling.handles(SHUTDOWN)
556     @Marshalling.args()
557     @Marshalling.retvoid
558     def shutdown(self):
559         self._testbed.shutdown()
560
561     @Marshalling.handles(CONFIGURE)
562     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
563     @Marshalling.retvoid
564     def defer_configure(self, name, value):
565         self._testbed.defer_configure(name, value)
566
567     @Marshalling.handles(CREATE_SET)
568     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
569     @Marshalling.retvoid
570     def defer_create_set(self, guid, name, value):
571         self._testbed.defer_create_set(guid, name, value)
572
573     @Marshalling.handles(FACTORY_SET)
574     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
575     @Marshalling.retvoid
576     def defer_factory_set(self, name, value):
577         self._testbed.defer_factory_set(name, value)
578
579     @Marshalling.handles(CONNECT)
580     @Marshalling.args(int, str, int, str)
581     @Marshalling.retvoid
582     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
583         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
584             connector_type_name2)
585
586     @Marshalling.handles(CROSS_CONNECT)
587     @Marshalling.args(int, str, int, int, str, str, str)
588     @Marshalling.retvoid
589     def defer_cross_connect(self, 
590             guid, connector_type_name,
591             cross_guid, cross_testbed_guid,
592             cross_testbed_id, cross_factory_id,
593             cross_connector_type_name):
594         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
595             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
596             cross_connector_type_name)
597
598     @Marshalling.handles(ADD_TRACE)
599     @Marshalling.args(int, str)
600     @Marshalling.retvoid
601     def defer_add_trace(self, guid, trace_id):
602         self._testbed.defer_add_trace(guid, trace_id)
603
604     @Marshalling.handles(ADD_ADDRESS)
605     @Marshalling.args(int, str, int, Marshalling.pickled_data)
606     @Marshalling.retvoid
607     def defer_add_address(self, guid, address, netprefix, broadcast):
608         self._testbed.defer_add_address(guid, address, netprefix,
609                 broadcast)
610
611     @Marshalling.handles(ADD_ROUTE)
612     @Marshalling.args(int, str, int, str, int)
613     @Marshalling.retvoid
614     def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
615         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
616
617     @Marshalling.handles(DO_SETUP)
618     @Marshalling.args()
619     @Marshalling.retvoid
620     def do_setup(self):
621         self._testbed.do_setup()
622
623     @Marshalling.handles(DO_CREATE)
624     @Marshalling.args()
625     @Marshalling.retvoid
626     def do_create(self):
627         self._testbed.do_create()
628
629     @Marshalling.handles(DO_CONNECT_INIT)
630     @Marshalling.args()
631     @Marshalling.retvoid
632     def do_connect_init(self):
633         self._testbed.do_connect_init()
634
635     @Marshalling.handles(DO_CONNECT_COMPL)
636     @Marshalling.args()
637     @Marshalling.retvoid
638     def do_connect_compl(self):
639         self._testbed.do_connect_compl()
640
641     @Marshalling.handles(DO_CONFIGURE)
642     @Marshalling.args()
643     @Marshalling.retvoid
644     def do_configure(self):
645         self._testbed.do_configure()
646
647     @Marshalling.handles(DO_PRECONFIGURE)
648     @Marshalling.args()
649     @Marshalling.retvoid
650     def do_preconfigure(self):
651         self._testbed.do_preconfigure()
652
653     @Marshalling.handles(DO_PRESTART)
654     @Marshalling.args()
655     @Marshalling.retvoid
656     def do_prestart(self):
657         self._testbed.do_prestart()
658
659     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
660     @Marshalling.args( Marshalling.Decoders.pickled_data )
661     @Marshalling.retvoid
662     def do_cross_connect_init(self, cross_data):
663         self._testbed.do_cross_connect_init(cross_data)
664
665     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
666     @Marshalling.args( Marshalling.Decoders.pickled_data )
667     @Marshalling.retvoid
668     def do_cross_connect_compl(self, cross_data):
669         self._testbed.do_cross_connect_compl(cross_data)
670
671     @Marshalling.handles(GET)
672     @Marshalling.args(int, Marshalling.base64_data, str)
673     @Marshalling.retval( Marshalling.pickled_data )
674     def get(self, guid, name, time):
675         return self._testbed.get(guid, name, time)
676
677     @Marshalling.handles(SET)
678     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
679     @Marshalling.retvoid
680     def set(self, guid, name, value, time):
681         self._testbed.set(guid, name, value, time)
682
683     @Marshalling.handles(GET_ADDRESS)
684     @Marshalling.args(int, int, Marshalling.base64_data)
685     @Marshalling.retval()
686     def get_address(self, guid, index, attribute):
687         return str(self._testbed.get_address(guid, index, attribute))
688
689     @Marshalling.handles(GET_ROUTE)
690     @Marshalling.args(int, int, Marshalling.base64_data)
691     @Marshalling.retval()
692     def get_route(self, guid, index, attribute):
693         return str(self._testbed.get_route(guid, index, attribute))
694
695     @Marshalling.handles(ACTION)
696     @Marshalling.args(str, int, Marshalling.base64_data)
697     @Marshalling.retvoid
698     def action(self, time, guid, command):
699         self._testbed.action(time, guid, command)
700
701     @Marshalling.handles(STATUS)
702     @Marshalling.args(Marshalling.nullint)
703     @Marshalling.retval(int)
704     def status(self, guid):
705         return self._testbed.status(guid)
706
707     @Marshalling.handles(TESTBED_STATUS)
708     @Marshalling.args()
709     @Marshalling.retval(int)
710     def testbed_status(self):
711         return self._testbed.testbed_status()
712
713     @Marshalling.handles(GET_ATTRIBUTE_LIST)
714     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
715     @Marshalling.retval( Marshalling.pickled_data )
716     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
717         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
718
719     @Marshalling.handles(GET_FACTORY_ID)
720     @Marshalling.args(int)
721     @Marshalling.retval()
722     def get_factory_id(self, guid):
723         return self._testbed.get_factory_id(guid)
724
725     @Marshalling.handles(RECOVER)
726     @Marshalling.args()
727     @Marshalling.retvoid
728     def recover(self):
729         self._testbed.recover()
730
731
732 class ExperimentControllerServer(BaseServer):
733     def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
734         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
735             environment_setup = environment_setup )
736         self._experiment_xml = experiment_xml
737         self._experiment = None
738
739     def post_daemonize(self):
740         from nepi.core.execute import ExperimentController
741         self._experiment = ExperimentController(self._experiment_xml, 
742             root_dir = self._root_dir)
743
744     @Marshalling.handles(GUIDS)
745     @Marshalling.args()
746     @Marshalling.retval( Marshalling.pickled_data )
747     def guids(self):
748         return self._experiment.guids
749
750     @Marshalling.handles(STARTED_TIME)
751     @Marshalling.args()
752     @Marshalling.retval( Marshalling.pickled_data )
753     def started_time(self):
754         return self._experiment.started_time
755
756     @Marshalling.handles(STOPPED_TIME)
757     @Marshalling.args()
758     @Marshalling.retval( Marshalling.pickled_data )
759     def stopped_time(self):
760         return self._experiment.stopped_time
761
762     @Marshalling.handles(XML)
763     @Marshalling.args()
764     @Marshalling.retval()
765     def experiment_design_xml(self):
766         return self._experiment.experiment_design_xml
767         
768     @Marshalling.handles(EXEC_XML)
769     @Marshalling.args()
770     @Marshalling.retval()
771     def experiment_execute_xml(self):
772         return self._experiment.experiment_execute_xml
773         
774     @Marshalling.handles(TRACE)
775     @Marshalling.args(int, str, Marshalling.base64_data)
776     @Marshalling.retval()
777     def trace(self, guid, trace_id, attribute):
778         return str(self._experiment.trace(guid, trace_id, attribute))
779
780     @Marshalling.handles(TRACES_INFO)
781     @Marshalling.args()
782     @Marshalling.retval( Marshalling.pickled_data )
783     def traces_info(self):
784         return self._experiment.traces_info()
785
786     @Marshalling.handles(FINISHED)
787     @Marshalling.args(int)
788     @Marshalling.retval(Marshalling.bool)
789     def is_finished(self, guid):
790         return self._experiment.is_finished(guid)
791
792     @Marshalling.handles(STATUS)
793     @Marshalling.args(int)
794     @Marshalling.retval(int)
795     def status(self, guid):
796         return self._experiment.is_finished(guid)
797
798     @Marshalling.handles(GET)
799     @Marshalling.args(int, Marshalling.base64_data, str)
800     @Marshalling.retval( Marshalling.pickled_data )
801     def get(self, guid, name, time):
802         return self._experiment.get(guid, name, time)
803
804     @Marshalling.handles(SET)
805     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
806     @Marshalling.retvoid
807     def set(self, guid, name, value, time):
808         self._experiment.set(guid, name, value, time)
809
810     @Marshalling.handles(START)
811     @Marshalling.args()
812     @Marshalling.retvoid
813     def start(self):
814         self._experiment.start()
815
816     @Marshalling.handles(STOP)
817     @Marshalling.args()
818     @Marshalling.retvoid
819     def stop(self):
820         self._experiment.stop()
821
822     @Marshalling.handles(RECOVER)
823     @Marshalling.args()
824     @Marshalling.retvoid
825     def recover(self):
826         self._experiment.recover()
827
828     @Marshalling.handles(SHUTDOWN)
829     @Marshalling.args()
830     @Marshalling.retvoid
831     def shutdown(self):
832         self._experiment.shutdown()
833
834     @Marshalling.handles(GET_TESTBED_ID)
835     @Marshalling.args(int)
836     @Marshalling.retval()
837     def get_testbed_id(self, guid):
838         return self._experiment.get_testbed_id(guid)
839
840     @Marshalling.handles(GET_FACTORY_ID)
841     @Marshalling.args(int)
842     @Marshalling.retval()
843     def get_factory_id(self, guid):
844         return self._experiment.get_factory_id(guid)
845
846     @Marshalling.handles(GET_TESTBED_VERSION)
847     @Marshalling.args(int)
848     @Marshalling.retval()
849     def get_testbed_version(self, guid):
850         return self._experiment.get_testbed_version(guid)
851
852 class BaseProxy(object):
853     _ServerClass = None
854     _ServerClassModule = "nepi.util.proxy"
855     
856     def __init__(self, ctor_args, root_dir, 
857             launch = True, 
858             communication = DC.ACCESS_LOCAL,
859             host = None, 
860             port = None, 
861             user = None, 
862             ident_key = None, 
863             agent = None,
864             sudo = False, 
865             environment_setup = ""):
866         if launch:
867             python_code = (
868                     "from %(classmodule)s import %(classname)s;"
869                     "s = %(classname)s%(ctor_args)r;"
870                     "s.run()" 
871                 % dict(
872                     classname = self._ServerClass.__name__,
873                     classmodule = self._ServerClassModule,
874                     ctor_args = ctor_args
875                 ) )
876             proc = server.popen_python(python_code,
877                         communication = communication,
878                         host = host,
879                         port = port, 
880                         user = user, 
881                         agent = agent,
882                         ident_key = ident_key, 
883                         sudo = sudo,
884                         environment_setup = environment_setup) 
885             # Wait for the server to be ready, otherwise nobody
886             # will be able to connect to it
887             helo = proc.stderr.readline()
888             if helo != 'SERVER_READY.\n':
889                 raise AssertionError, "Expected 'SERVER_READY.', got %r: %s" % (helo,
890                         helo + proc.stderr.read())
891         # connect client to server
892         self._client = server.Client(root_dir, 
893                 communication = communication,
894                 host = host, 
895                 port = port, 
896                 user = user, 
897                 agent = agent, 
898                 sudo = sudo,
899                 environment_setup = environment_setup)
900     
901     @staticmethod
902     def _make_message(argtypes, argencoders, command, methname, classname, *args):
903         if len(argtypes) != len(argencoders):
904             raise ValueError, "Invalid arguments for _make_message: "\
905                 "in stub method %s of class %s "\
906                 "argtypes and argencoders must match in size" % (
907                     methname, classname )
908         if len(argtypes) != len(args):
909             raise ValueError, "Invalid arguments for _make_message: "\
910                 "in stub method %s of class %s "\
911                 "expected %d arguments, got %d" % (
912                     methname, classname,
913                     len(argtypes), len(args))
914         
915         buf = []
916         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
917             try:
918                 buf.append(fmt % encode(val))
919             except:
920                 import traceback
921                 raise TypeError, "Argument %d of stub method %s of class %s "\
922                     "requires a value of type %s, but got %s - nested error: %s" % (
923                         argnum, methname, classname,
924                         getattr(typ, '__name__', typ), type(val),
925                         traceback.format_exc()
926                 )
927         
928         return "%d|%s" % (command, '|'.join(buf))
929     
930     @staticmethod
931     def _parse_reply(rvtype, methname, classname, reply):
932         if not reply:
933             raise RuntimeError, "Invalid reply: %r "\
934                 "for stub method %s of class %s" % (
935                     reply,
936                     methname,
937                     classname)
938         
939         try:
940             result = reply.split("|")
941             code = int(result[0])
942             text = result[1]
943         except:
944             import traceback
945             raise TypeError, "Return value of stub method %s of class %s "\
946                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
947                     methname, classname,
948                     getattr(rvtype, '__name__', rvtype), reply,
949                     traceback.format_exc()
950             )
951         if code == ERROR:
952             text = base64.b64decode(text)
953             raise RuntimeError(text)
954         elif code == OK:
955             try:
956                 if rvtype is None:
957                     return
958                 else:
959                     return rvtype(text)
960             except:
961                 import traceback
962                 raise TypeError, "Return value of stub method %s of class %s "\
963                     "cannot be parsed: must be of type %s - nested error: %s" % (
964                         methname, classname,
965                         getattr(rvtype, '__name__', rvtype),
966                         traceback.format_exc()
967                 )
968         else:
969             raise RuntimeError, "Invalid reply: %r "\
970                 "for stub method %s of class %s - unknown code" % (
971                     reply,
972                     methname,
973                     classname)
974     
975     @staticmethod
976     def _make_stubs(server_class, template_class):
977         """
978         Returns a dictionary method_name -> method
979         with stub methods.
980         
981         Usage:
982         
983             class SomeProxy(BaseProxy):
984                ...
985                
986                locals().update( BaseProxy._make_stubs(
987                     ServerClass,
988                     TemplateClass
989                ) )
990         
991         ServerClass is the corresponding Server class, as
992         specified in the _ServerClass class method (_make_stubs
993         is static and can't access the method), and TemplateClass
994         is the ultimate implementation class behind the server,
995         from which argument names and defaults are taken, to
996         maintain meaningful interfaces.
997         """
998         rv = {}
999         
1000         class NONE: pass
1001         
1002         import os.path
1003         func_template_path = os.path.join(
1004             os.path.dirname(__file__),
1005             'proxy_stub.tpl')
1006         func_template_file = open(func_template_path, "r")
1007         func_template = func_template_file.read()
1008         func_template_file.close()
1009         
1010         for methname in vars(template_class).copy():
1011             if methname.endswith('_deferred'):
1012                 # cannot wrap deferreds...
1013                 continue
1014             dmethname = methname+'_deferred'
1015             if hasattr(server_class, methname) and not methname.startswith('_'):
1016                 template_meth = getattr(template_class, methname)
1017                 server_meth = getattr(server_class, methname)
1018                 
1019                 command = getattr(server_meth, '_handles_command', None)
1020                 argtypes = getattr(server_meth, '_argtypes', None)
1021                 argencoders = getattr(server_meth, '_argencoders', None)
1022                 rvtype = getattr(server_meth, '_retval', None)
1023                 doprop = False
1024                 
1025                 if hasattr(template_meth, 'fget'):
1026                     # property getter
1027                     template_meth = template_meth.fget
1028                     doprop = True
1029                 
1030                 if command is not None and argtypes is not None and argencoders is not None:
1031                     # We have an interface method...
1032                     code = template_meth.func_code
1033                     argnames = code.co_varnames[:code.co_argcount]
1034                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1035                                   + (template_meth.func_defaults or ()) )
1036                     
1037                     func_globals = dict(
1038                         BaseProxy = BaseProxy,
1039                         argtypes = argtypes,
1040                         argencoders = argencoders,
1041                         rvtype = rvtype,
1042                         functools = functools,
1043                     )
1044                     context = dict()
1045                     
1046                     func_text = func_template % dict(
1047                         self = argnames[0],
1048                         args = '%s' % (','.join(argnames[1:])),
1049                         argdefs = ','.join([
1050                             argname if argdef is NONE
1051                             else "%s=%r" % (argname, argdef)
1052                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
1053                         ]),
1054                         command = command,
1055                         methname = methname,
1056                         classname = server_class.__name__
1057                     )
1058                     
1059                     func_text = compile(
1060                         func_text,
1061                         func_template_path,
1062                         'exec')
1063                     
1064                     exec func_text in func_globals, context
1065                     
1066                     if doprop:
1067                         rv[methname] = property(context[methname])
1068                         rv[dmethname] = property(context[dmethname])
1069                     else:
1070                         rv[methname] = context[methname]
1071                         rv[dmethname] = context[dmethname]
1072                     
1073                     # inject _deferred into core classes
1074                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1075                         def freezename(methname, dmethname):
1076                             def dmeth(self, *p, **kw): 
1077                                 return getattr(self, methname)(*p, **kw)
1078                             dmeth.__name__ = dmethname
1079                             return dmeth
1080                         dmeth = freezename(methname, dmethname)
1081                         setattr(template_class, dmethname, dmeth)
1082         
1083         return rv
1084                         
1085 class TestbedControllerProxy(BaseProxy):
1086     
1087     _ServerClass = TestbedControllerServer
1088     
1089     def __init__(self, root_dir, log_level, 
1090             testbed_id = None, 
1091             testbed_version = None, 
1092             launch = True, 
1093             communication = DC.ACCESS_LOCAL,
1094             host = None, 
1095             port = None, 
1096             user = None, 
1097             ident_key = None, 
1098             agent = None,
1099             sudo = False, 
1100             environment_setup = ""):
1101         if launch and (testbed_id == None or testbed_version == None):
1102             raise RuntimeError("To launch a TesbedControllerServer a "
1103                     "testbed_id and testbed_version are required")
1104         super(TestbedControllerProxy,self).__init__(
1105             ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
1106             root_dir = root_dir,
1107             launch = launch,
1108             communication = communication,
1109             host = host, 
1110             port = port, 
1111             user = user,
1112             ident_key = ident_key, 
1113             agent = agent, 
1114             sudo = sudo, 
1115             environment_setup = environment_setup)
1116
1117     locals().update( BaseProxy._make_stubs(
1118         server_class = TestbedControllerServer,
1119         template_class = nepi.core.execute.TestbedController,
1120     ) )
1121     
1122     # Shutdown stops the serverside...
1123     def shutdown(self, _stub = shutdown):
1124         rv = _stub(self)
1125         self._client.send_stop()
1126         self._client.read_reply() # wait for it
1127         return rv
1128     
1129
1130 class ExperimentControllerProxy(BaseProxy):
1131     _ServerClass = ExperimentControllerServer
1132     
1133     def __init__(self, root_dir, log_level, 
1134             experiment_xml = None, 
1135             launch = True, 
1136             communication = DC.ACCESS_LOCAL,
1137             host = None, 
1138             port = None, 
1139             user = None, 
1140             ident_key = None, 
1141             agent = None, 
1142             sudo = False, 
1143             environment_setup = ""):
1144         super(ExperimentControllerProxy,self).__init__(
1145             ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
1146             root_dir = root_dir,
1147             launch = launch, 
1148             communication = communication,
1149             host = host, 
1150             port = port, 
1151             user = user,
1152             ident_key = ident_key, 
1153             agent = agent, 
1154             sudo = sudo, 
1155             environment_setup = environment_setup)
1156
1157     locals().update( BaseProxy._make_stubs(
1158         server_class = ExperimentControllerServer,
1159         template_class = nepi.core.execute.ExperimentController,
1160     ) )
1161
1162     
1163     # Shutdown stops the serverside...
1164     def shutdown(self, _stub = shutdown):
1165         rv = _stub(self)
1166         self._client.send_stop()
1167         self._client.read_reply() # wait for it
1168         return rv
1169