Wait for SERVER_READY or PROXY_READ, instead of expecting it as the first line.
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
10 import getpass
11 import cPickle
12 import sys
13 import time
14 import tempfile
15 import shutil
16 import functools
17 import os
18
19 # PROTOCOL REPLIES
20 OK = 0
21 ERROR = 1
22
23 # PROTOCOL INSTRUCTION MESSAGES
24 XML = 2 
25 TRACE   = 4
26 FINISHED    = 5
27 START   = 6
28 STOP    = 7
29 SHUTDOWN    = 8
30 CONFIGURE   = 9
31 CREATE      = 10
32 CREATE_SET  = 11
33 FACTORY_SET = 12
34 CONNECT     = 13
35 CROSS_CONNECT   = 14
36 ADD_TRACE   = 15
37 ADD_ADDRESS = 16
38 ADD_ROUTE   = 17
39 DO_SETUP    = 18
40 DO_CREATE   = 19
41 DO_CONNECT_INIT = 20
42 DO_CONFIGURE    = 21
43 DO_CROSS_CONNECT_INIT   = 22
44 GET = 23
45 SET = 24
46 ACTION  = 25
47 STATUS  = 26
48 GUIDS  = 27
49 GET_ROUTE = 28
50 GET_ADDRESS = 29
51 RECOVER = 30
52 DO_PRECONFIGURE     = 31
53 GET_ATTRIBUTE_LIST  = 32
54 DO_CONNECT_COMPL    = 33
55 DO_CROSS_CONNECT_COMPL  = 34
56 TESTBED_ID  = 35
57 TESTBED_VERSION  = 36
58 DO_PRESTART = 37
59 GET_FACTORY_ID = 38
60 GET_TESTBED_ID = 39
61 GET_TESTBED_VERSION = 40
62 TRACES_INFO = 41
63 EXEC_XML = 42
64 TESTBED_STATUS  = 43
65 STARTED_TIME  = 44
66 STOPPED_TIME  = 45
67
68 instruction_text = dict({
69     OK:     "OK",
70     ERROR:  "ERROR",
71     XML:    "XML",
72     EXEC_XML:    "EXEC_XML",
73     TRACE:  "TRACE",
74     FINISHED:   "FINISHED",
75     START:  "START",
76     STOP:   "STOP",
77     RECOVER: "RECOVER",
78     SHUTDOWN:   "SHUTDOWN",
79     CONFIGURE:  "CONFIGURE",
80     CREATE: "CREATE",
81     CREATE_SET: "CREATE_SET",
82     FACTORY_SET:    "FACTORY_SET",
83     CONNECT:    "CONNECT",
84     CROSS_CONNECT: "CROSS_CONNECT",
85     ADD_TRACE:  "ADD_TRACE",
86     ADD_ADDRESS:    "ADD_ADDRESS",
87     ADD_ROUTE:  "ADD_ROUTE",
88     DO_SETUP:   "DO_SETUP",
89     DO_CREATE:  "DO_CREATE",
90     DO_CONNECT_INIT: "DO_CONNECT_INIT",
91     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
92     DO_CONFIGURE:   "DO_CONFIGURE",
93     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
94     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
95     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
96     GET:    "GET",
97     SET:    "SET",
98     GET_ROUTE: "GET_ROUTE",
99     GET_ADDRESS: "GET_ADDRESS",
100     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
101     GET_FACTORY_ID: "GET_FACTORY_ID",
102     GET_TESTBED_ID: "GET_TESTBED_ID",
103     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
104     ACTION: "ACTION",
105     STATUS: "STATUS",
106     GUIDS:  "GUIDS",
107     TESTBED_ID: "TESTBED_ID",
108     TESTBED_VERSION: "TESTBED_VERSION",
109     TRACES_INFO: "TRACES_INFO",
110     STARTED_TIME: "STARTED_TIME",
111     STOPPED_TIME: "STOPPED_TIME",
112
113     })
114
115 def log_msg(server, params):
116     try:
117         instr = int(params[0])
118         instr_txt = instruction_text[instr]
119         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
120             instr_txt, ", ".join(map(str, params[1:]))))
121     except:
122         # don't die for logging
123         pass
124
125 def log_reply(server, reply):
126     try:
127         res = reply.split("|")
128         code = int(res[0])
129         code_txt = instruction_text[code]
130         try:
131             txt = base64.b64decode(res[1])
132         except:
133             txt = res[1]
134         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
135                 code_txt, txt))
136     except:
137         # don't die for logging
138         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
139                 reply))
140         pass
141
142 def to_server_log_level(log_level):
143     return (
144         DC.DEBUG_LEVEL
145             if log_level == DC.DEBUG_LEVEL 
146         else DC.ERROR_LEVEL
147     )
148
149 def get_access_config_params(access_config):
150     mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
151     launch = not access_config.get_attribute_value(DC.RECOVER)
152     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
153     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
154     log_level = to_server_log_level(log_level)
155     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
156     environment_setup = (
157         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
158         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
159         else ""
160     )
161     user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
162     host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
163     port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
164     agent = access_config.get_attribute_value(DC.USE_AGENT)
165     sudo = access_config.get_attribute_value(DC.USE_SUDO)
166     key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
167     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
168     clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
169     return (mode, launch, root_dir, log_level, communication, user, host, port,
170             key, agent, sudo, environment_setup, clean_root)
171
172 class AccessConfiguration(AttributesMap):
173     def __init__(self, params = None):
174         super(AccessConfiguration, self).__init__()
175         
176         from nepi.core.metadata import Metadata
177         
178         for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
179             self.add_attribute(**attr_info)
180
181         if params:
182             for attr_name, attr_value in params.iteritems():
183                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
184                 attr_value = parser(attr_value)
185                 self.set_attribute_value(attr_name, attr_value)
186
187 class TempDir(object):
188     def __init__(self):
189         self.path = tempfile.mkdtemp()
190     
191     def __del__(self):
192         shutil.rmtree(self.path)
193
194 class PermDir(object):
195     def __init__(self, path):
196         self.path = path
197
198 def create_experiment_controller(xml, access_config = None):
199     mode = None
200     launch = True
201     log_level = DC.ERROR_LEVEL
202     if access_config:
203         (mode, launch, root_dir, log_level, communication, user, host, port, 
204                 key, agent, sudo, environment_setup, clean_root) \
205                         = get_access_config_params(access_config)
206
207     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
208
209     if not mode or mode == DC.MODE_SINGLE_PROCESS:
210         from nepi.core.execute import ExperimentController
211         
212         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
213             root_dir = TempDir()
214         else:
215             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
216         controller = ExperimentController(xml, root_dir.path)
217         
218         # inject reference to temporary dir, so that it gets cleaned
219         # up at destruction time.
220         controller._tempdir = root_dir
221         
222         if not launch:
223             # try to recover
224             controller.recover()
225         
226         return controller
227     elif mode == DC.MODE_DAEMON:
228         try:
229             return ExperimentControllerProxy(root_dir, log_level,
230                 experiment_xml = xml,
231                 communication = communication,
232                 host = host, 
233                 port = port, 
234                 user = user, 
235                 ident_key = key,
236                 agent = agent, 
237                 sudo = sudo, 
238                 launch = launch,
239                 environment_setup = environment_setup, 
240                 clean_root = clean_root)
241         except:
242             if not launch:
243                 # Maybe controller died, recover from persisted testbed information if possible
244                 controller = ExperimentControllerProxy(root_dir, log_level,
245                     experiment_xml = xml,
246                     communication = communication,
247                     host = host, 
248                     port = port, 
249                     user = user, 
250                     ident_key = key,
251                     agent = agent, 
252                     sudo = sudo, 
253                     launch = True,
254                     environment_setup = environment_setup,
255                     clean_root = clean_root)
256                 controller.recover()
257                 return controller
258             else:
259                 raise
260     raise RuntimeError("Unsupported access configuration '%s'" % mode)
261
262 def create_testbed_controller(testbed_id, testbed_version, access_config):
263     mode = None
264     launch = True
265     log_level = DC.ERROR_LEVEL
266     if access_config:
267         (mode, launch, root_dir, log_level, communication, user, host, port, 
268                 key, agent, sudo, environment_setup, clean_root) \
269                         = get_access_config_params(access_config)
270
271     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
272     
273     if not mode or mode == DC.MODE_SINGLE_PROCESS:
274         if not launch:
275             raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
276         return  _build_testbed_controller(testbed_id, testbed_version)
277     elif mode == DC.MODE_DAEMON:
278         return TestbedControllerProxy(root_dir, log_level, 
279                 testbed_id = testbed_id, 
280                 testbed_version = testbed_version,
281                 communication = communication,
282                 host = host, 
283                 port = port, 
284                 ident_key = key,
285                 user = user, 
286                 agent = agent, 
287                 sudo = sudo, 
288                 launch = launch,
289                 environment_setup = environment_setup, 
290                 clean_root = clean_root)
291     raise RuntimeError("Unsupported access configuration '%s'" % mode)
292
293 def _build_testbed_controller(testbed_id, testbed_version):
294     mod_name = nepi.util.environ.find_testbed(testbed_id)
295     
296     if not mod_name in sys.modules:
297         try:
298             __import__(mod_name)
299         except ImportError:
300             raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
301     
302     module = sys.modules[mod_name]
303     tc = module.TestbedController()
304     if tc.testbed_version != testbed_version:
305         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
306                 (testbed_id, testbed_version, tc.testbed_version))
307     return tc
308
309 # Just a namespace class
310 class Marshalling:
311     class Decoders:
312         @staticmethod
313         def pickled_data(sdata):
314             return cPickle.loads(base64.b64decode(sdata))
315         
316         @staticmethod
317         def base64_data(sdata):
318             return base64.b64decode(sdata)
319         
320         @staticmethod
321         def nullint(sdata):
322             return None if sdata == "None" else int(sdata)
323
324         @staticmethod
325         def bool(sdata):
326             return sdata == 'True'
327         
328     class Encoders:
329         @staticmethod
330         def pickled_data(data):
331             return base64.b64encode(cPickle.dumps(data))
332         
333         @staticmethod
334         def base64_data(data):
335             return base64.b64encode(data)
336         
337         @staticmethod
338         def nullint(data):
339             return "None" if data is None else int(data)
340         
341         @staticmethod
342         def bool(data):
343             return str(bool(data))
344            
345     # import into Marshalling all the decoders
346     # they act as types
347     locals().update([
348         (typname, typ)
349         for typname, typ in vars(Decoders).iteritems()
350         if not typname.startswith('_')
351     ])
352
353     _TYPE_ENCODERS = dict([
354         # id(type) -> (<encoding_function>, <formatting_string>)
355         (typname, (getattr(Encoders,typname),"%s"))
356         for typname in vars(Decoders)
357         if not typname.startswith('_')
358            and hasattr(Encoders,typname)
359     ])
360
361     # Builtins
362     _TYPE_ENCODERS["float"] = (float, "%r")
363     _TYPE_ENCODERS["int"] = (int, "%d")
364     _TYPE_ENCODERS["long"] = (int, "%d")
365     _TYPE_ENCODERS["str"] = (str, "%s")
366     _TYPE_ENCODERS["unicode"] = (str, "%s")
367     
368     # Generic encoder
369     _TYPE_ENCODERS[None] = (str, "%s")
370     
371     @staticmethod
372     def args(*types):
373         """
374         Decorator that converts the given function into one that takes
375         a single "params" list, with each parameter marshalled according
376         to the given factory callable (type constructors are accepted).
377         
378         The first argument (self) is left untouched.
379         
380         eg:
381         
382         @Marshalling.args(int,int,str,base64_data)
383         def somefunc(self, someint, otherint, somestr, someb64):
384            return someretval
385         """
386         def decor(f):
387             @functools.wraps(f)
388             def rv(self, params):
389                 return f(self, *[ ctor(val)
390                                   for ctor,val in zip(types, params[1:]) ])
391             
392             rv._argtypes = types
393             
394             # Derive type encoders by looking up types in _TYPE_ENCODERS
395             # make_proxy will use it to encode arguments in command strings
396             argencoders = []
397             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
398             for typ in types:
399                 if typ.__name__ in TYPE_ENCODERS:
400                     argencoders.append(TYPE_ENCODERS[typ.__name__])
401                 else:
402                     # generic encoder
403                     argencoders.append(TYPE_ENCODERS[None])
404             
405             rv._argencoders = tuple(argencoders)
406             
407             rv._retval = getattr(f, '_retval', None)
408             return rv
409         return decor
410
411     @staticmethod
412     def retval(typ=Decoders.base64_data):
413         """
414         Decorator that converts the given function into one that 
415         returns a properly encoded return string, given that the undecorated
416         function returns suitable input for the encoding function.
417         
418         The optional typ argument specifies a type.
419         For the default of base64_data, return values should be strings.
420         The return value of the encoding method should be a string always.
421         
422         eg:
423         
424         @Marshalling.args(int,int,str,base64_data)
425         @Marshalling.retval(str)
426         def somefunc(self, someint, otherint, somestr, someb64):
427            return someint
428         """
429         encode, fmt = Marshalling._TYPE_ENCODERS.get(
430             typ.__name__,
431             Marshalling._TYPE_ENCODERS[None])
432         fmt = "%d|"+fmt
433         
434         def decor(f):
435             @functools.wraps(f)
436             def rv(self, *p, **kw):
437                 data = f(self, *p, **kw)
438                 return fmt % (
439                     OK,
440                     encode(data)
441                 )
442             rv._retval = typ
443             rv._argtypes = getattr(f, '_argtypes', None)
444             rv._argencoders = getattr(f, '_argencoders', None)
445             return rv
446         return decor
447     
448     @staticmethod
449     def retvoid(f):
450         """
451         Decorator that converts the given function into one that 
452         always return an encoded empty string.
453         
454         Useful for null-returning functions.
455         """
456         OKRV = "%d|" % (OK,)
457         
458         @functools.wraps(f)
459         def rv(self, *p, **kw):
460             f(self, *p, **kw)
461             return OKRV
462         
463         rv._retval = None
464         rv._argtypes = getattr(f, '_argtypes', None)
465         rv._argencoders = getattr(f, '_argencoders', None)
466         return rv
467     
468     @staticmethod
469     def handles(whichcommand):
470         """
471         Associates the method with a given command code for servers.
472         It should always be the topmost decorator.
473         """
474         def decor(f):
475             f._handles_command = whichcommand
476             return f
477         return decor
478
479 class BaseServer(server.Server):
480     def reply_action(self, msg):
481         if not msg:
482             result = base64.b64encode("Invalid command line")
483             reply = "%d|%s" % (ERROR, result)
484         else:
485             params = msg.split("|")
486             instruction = int(params[0])
487             log_msg(self, params)
488             try:
489                 for mname,meth in vars(self.__class__).iteritems():
490                     if not mname.startswith('_'):
491                         cmd = getattr(meth, '_handles_command', None)
492                         if cmd == instruction:
493                             meth = getattr(self, mname)
494                             reply = meth(params)
495                             break
496                 else:
497                     error = "Invalid instruction %s" % instruction
498                     self.log_error(error)
499                     result = base64.b64encode(error)
500                     reply = "%d|%s" % (ERROR, result)
501             except:
502                 error = self.log_error()
503                 result = base64.b64encode(error)
504                 reply = "%d|%s" % (ERROR, result)
505         log_reply(self, reply)
506         return reply
507
508 class TestbedControllerServer(BaseServer):
509     def __init__(self, root_dir, log_level, testbed_id, testbed_version, 
510             environment_setup, clean_root):
511         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
512             environment_setup = environment_setup, clean_root = clean_root)
513         self._testbed_id = testbed_id
514         self._testbed_version = testbed_version
515         self._testbed = None
516
517     def post_daemonize(self):
518         self._testbed = _build_testbed_controller(self._testbed_id, 
519                 self._testbed_version)
520
521     @Marshalling.handles(GUIDS)
522     @Marshalling.args()
523     @Marshalling.retval( Marshalling.pickled_data )
524     def guids(self):
525         return self._testbed.guids
526
527     @Marshalling.handles(TESTBED_ID)
528     @Marshalling.args()
529     @Marshalling.retval()
530     def testbed_id(self):
531         return str(self._testbed.testbed_id)
532
533     @Marshalling.handles(TESTBED_VERSION)
534     @Marshalling.args()
535     @Marshalling.retval()
536     def testbed_version(self):
537         return str(self._testbed.testbed_version)
538
539     @Marshalling.handles(CREATE)
540     @Marshalling.args(int, str)
541     @Marshalling.retvoid
542     def defer_create(self, guid, factory_id):
543         self._testbed.defer_create(guid, factory_id)
544
545     @Marshalling.handles(TRACE)
546     @Marshalling.args(int, str, Marshalling.base64_data)
547     @Marshalling.retval()
548     def trace(self, guid, trace_id, attribute):
549         return self._testbed.trace(guid, trace_id, attribute)
550
551     @Marshalling.handles(TRACES_INFO)
552     @Marshalling.args()
553     @Marshalling.retval( Marshalling.pickled_data )
554     def traces_info(self):
555         return self._testbed.traces_info()
556
557     @Marshalling.handles(START)
558     @Marshalling.args()
559     @Marshalling.retvoid
560     def start(self):
561         self._testbed.start()
562
563     @Marshalling.handles(STOP)
564     @Marshalling.args()
565     @Marshalling.retvoid
566     def stop(self):
567         self._testbed.stop()
568
569     @Marshalling.handles(SHUTDOWN)
570     @Marshalling.args()
571     @Marshalling.retvoid
572     def shutdown(self):
573         self._testbed.shutdown()
574
575     @Marshalling.handles(CONFIGURE)
576     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
577     @Marshalling.retvoid
578     def defer_configure(self, name, value):
579         self._testbed.defer_configure(name, value)
580
581     @Marshalling.handles(CREATE_SET)
582     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
583     @Marshalling.retvoid
584     def defer_create_set(self, guid, name, value):
585         self._testbed.defer_create_set(guid, name, value)
586
587     @Marshalling.handles(FACTORY_SET)
588     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
589     @Marshalling.retvoid
590     def defer_factory_set(self, name, value):
591         self._testbed.defer_factory_set(name, value)
592
593     @Marshalling.handles(CONNECT)
594     @Marshalling.args(int, str, int, str)
595     @Marshalling.retvoid
596     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
597         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
598             connector_type_name2)
599
600     @Marshalling.handles(CROSS_CONNECT)
601     @Marshalling.args(int, str, int, int, str, str, str)
602     @Marshalling.retvoid
603     def defer_cross_connect(self, 
604             guid, connector_type_name,
605             cross_guid, cross_testbed_guid,
606             cross_testbed_id, cross_factory_id,
607             cross_connector_type_name):
608         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
609             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
610             cross_connector_type_name)
611
612     @Marshalling.handles(ADD_TRACE)
613     @Marshalling.args(int, str)
614     @Marshalling.retvoid
615     def defer_add_trace(self, guid, trace_id):
616         self._testbed.defer_add_trace(guid, trace_id)
617
618     @Marshalling.handles(ADD_ADDRESS)
619     @Marshalling.args(int, str, int, Marshalling.pickled_data)
620     @Marshalling.retvoid
621     def defer_add_address(self, guid, address, netprefix, broadcast):
622         self._testbed.defer_add_address(guid, address, netprefix,
623                 broadcast)
624
625     @Marshalling.handles(ADD_ROUTE)
626     @Marshalling.args(int, str, int, str, int)
627     @Marshalling.retvoid
628     def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
629         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
630
631     @Marshalling.handles(DO_SETUP)
632     @Marshalling.args()
633     @Marshalling.retvoid
634     def do_setup(self):
635         self._testbed.do_setup()
636
637     @Marshalling.handles(DO_CREATE)
638     @Marshalling.args()
639     @Marshalling.retvoid
640     def do_create(self):
641         self._testbed.do_create()
642
643     @Marshalling.handles(DO_CONNECT_INIT)
644     @Marshalling.args()
645     @Marshalling.retvoid
646     def do_connect_init(self):
647         self._testbed.do_connect_init()
648
649     @Marshalling.handles(DO_CONNECT_COMPL)
650     @Marshalling.args()
651     @Marshalling.retvoid
652     def do_connect_compl(self):
653         self._testbed.do_connect_compl()
654
655     @Marshalling.handles(DO_CONFIGURE)
656     @Marshalling.args()
657     @Marshalling.retvoid
658     def do_configure(self):
659         self._testbed.do_configure()
660
661     @Marshalling.handles(DO_PRECONFIGURE)
662     @Marshalling.args()
663     @Marshalling.retvoid
664     def do_preconfigure(self):
665         self._testbed.do_preconfigure()
666
667     @Marshalling.handles(DO_PRESTART)
668     @Marshalling.args()
669     @Marshalling.retvoid
670     def do_prestart(self):
671         self._testbed.do_prestart()
672
673     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
674     @Marshalling.args( Marshalling.Decoders.pickled_data )
675     @Marshalling.retvoid
676     def do_cross_connect_init(self, cross_data):
677         self._testbed.do_cross_connect_init(cross_data)
678
679     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
680     @Marshalling.args( Marshalling.Decoders.pickled_data )
681     @Marshalling.retvoid
682     def do_cross_connect_compl(self, cross_data):
683         self._testbed.do_cross_connect_compl(cross_data)
684
685     @Marshalling.handles(GET)
686     @Marshalling.args(int, Marshalling.base64_data, str)
687     @Marshalling.retval( Marshalling.pickled_data )
688     def get(self, guid, name, time):
689         return self._testbed.get(guid, name, time)
690
691     @Marshalling.handles(SET)
692     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
693     @Marshalling.retvoid
694     def set(self, guid, name, value, time):
695         self._testbed.set(guid, name, value, time)
696
697     @Marshalling.handles(GET_ADDRESS)
698     @Marshalling.args(int, int, Marshalling.base64_data)
699     @Marshalling.retval()
700     def get_address(self, guid, index, attribute):
701         return str(self._testbed.get_address(guid, index, attribute))
702
703     @Marshalling.handles(GET_ROUTE)
704     @Marshalling.args(int, int, Marshalling.base64_data)
705     @Marshalling.retval()
706     def get_route(self, guid, index, attribute):
707         return str(self._testbed.get_route(guid, index, attribute))
708
709     @Marshalling.handles(ACTION)
710     @Marshalling.args(str, int, Marshalling.base64_data)
711     @Marshalling.retvoid
712     def action(self, time, guid, command):
713         self._testbed.action(time, guid, command)
714
715     @Marshalling.handles(STATUS)
716     @Marshalling.args(Marshalling.nullint)
717     @Marshalling.retval(int)
718     def status(self, guid):
719         return self._testbed.status(guid)
720
721     @Marshalling.handles(TESTBED_STATUS)
722     @Marshalling.args()
723     @Marshalling.retval(int)
724     def testbed_status(self):
725         return self._testbed.testbed_status()
726
727     @Marshalling.handles(GET_ATTRIBUTE_LIST)
728     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
729     @Marshalling.retval( Marshalling.pickled_data )
730     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
731         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
732
733     @Marshalling.handles(GET_FACTORY_ID)
734     @Marshalling.args(int)
735     @Marshalling.retval()
736     def get_factory_id(self, guid):
737         return self._testbed.get_factory_id(guid)
738
739     @Marshalling.handles(RECOVER)
740     @Marshalling.args()
741     @Marshalling.retvoid
742     def recover(self):
743         self._testbed.recover()
744
745
746 class ExperimentControllerServer(BaseServer):
747     def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
748             clean_root):
749         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
750             environment_setup = environment_setup, clean_root = clean_root)
751         self._experiment_xml = experiment_xml
752         self._experiment = None
753
754     def post_daemonize(self):
755         from nepi.core.execute import ExperimentController
756         self._experiment = ExperimentController(self._experiment_xml, 
757             root_dir = self._root_dir)
758
759     @Marshalling.handles(GUIDS)
760     @Marshalling.args()
761     @Marshalling.retval( Marshalling.pickled_data )
762     def guids(self):
763         return self._experiment.guids
764
765     @Marshalling.handles(STARTED_TIME)
766     @Marshalling.args()
767     @Marshalling.retval( Marshalling.pickled_data )
768     def started_time(self):
769         return self._experiment.started_time
770
771     @Marshalling.handles(STOPPED_TIME)
772     @Marshalling.args()
773     @Marshalling.retval( Marshalling.pickled_data )
774     def stopped_time(self):
775         return self._experiment.stopped_time
776
777     @Marshalling.handles(XML)
778     @Marshalling.args()
779     @Marshalling.retval()
780     def experiment_design_xml(self):
781         return self._experiment.experiment_design_xml
782         
783     @Marshalling.handles(EXEC_XML)
784     @Marshalling.args()
785     @Marshalling.retval()
786     def experiment_execute_xml(self):
787         return self._experiment.experiment_execute_xml
788         
789     @Marshalling.handles(TRACE)
790     @Marshalling.args(int, str, Marshalling.base64_data)
791     @Marshalling.retval()
792     def trace(self, guid, trace_id, attribute):
793         return str(self._experiment.trace(guid, trace_id, attribute))
794
795     @Marshalling.handles(TRACES_INFO)
796     @Marshalling.args()
797     @Marshalling.retval( Marshalling.pickled_data )
798     def traces_info(self):
799         return self._experiment.traces_info()
800
801     @Marshalling.handles(FINISHED)
802     @Marshalling.args(int)
803     @Marshalling.retval(Marshalling.bool)
804     def is_finished(self, guid):
805         return self._experiment.is_finished(guid)
806
807     @Marshalling.handles(STATUS)
808     @Marshalling.args(int)
809     @Marshalling.retval(int)
810     def status(self, guid):
811         return self._experiment.is_finished(guid)
812
813     @Marshalling.handles(GET)
814     @Marshalling.args(int, Marshalling.base64_data, str)
815     @Marshalling.retval( Marshalling.pickled_data )
816     def get(self, guid, name, time):
817         return self._experiment.get(guid, name, time)
818
819     @Marshalling.handles(SET)
820     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
821     @Marshalling.retvoid
822     def set(self, guid, name, value, time):
823         self._experiment.set(guid, name, value, time)
824
825     @Marshalling.handles(START)
826     @Marshalling.args()
827     @Marshalling.retvoid
828     def start(self):
829         self._experiment.start()
830
831     @Marshalling.handles(STOP)
832     @Marshalling.args()
833     @Marshalling.retvoid
834     def stop(self):
835         self._experiment.stop()
836
837     @Marshalling.handles(RECOVER)
838     @Marshalling.args()
839     @Marshalling.retvoid
840     def recover(self):
841         self._experiment.recover()
842
843     @Marshalling.handles(SHUTDOWN)
844     @Marshalling.args()
845     @Marshalling.retvoid
846     def shutdown(self):
847         self._experiment.shutdown()
848
849     @Marshalling.handles(GET_TESTBED_ID)
850     @Marshalling.args(int)
851     @Marshalling.retval()
852     def get_testbed_id(self, guid):
853         return self._experiment.get_testbed_id(guid)
854
855     @Marshalling.handles(GET_FACTORY_ID)
856     @Marshalling.args(int)
857     @Marshalling.retval()
858     def get_factory_id(self, guid):
859         return self._experiment.get_factory_id(guid)
860
861     @Marshalling.handles(GET_TESTBED_VERSION)
862     @Marshalling.args(int)
863     @Marshalling.retval()
864     def get_testbed_version(self, guid):
865         return self._experiment.get_testbed_version(guid)
866
867 class BaseProxy(object):
868     _ServerClass = None
869     _ServerClassModule = "nepi.util.proxy"
870     
871     def __init__(self, ctor_args, root_dir, 
872             launch = True, 
873             communication = DC.ACCESS_LOCAL,
874             host = None, 
875             port = None, 
876             user = None, 
877             ident_key = None, 
878             agent = None,
879             sudo = False, 
880             environment_setup = "",
881             clean_root = False):
882         if launch:
883             python_code = (
884                     "from %(classmodule)s import %(classname)s;"
885                     "s = %(classname)s%(ctor_args)r;"
886                     "s.run()" 
887                 % dict(
888                     classname = self._ServerClass.__name__,
889                     classmodule = self._ServerClassModule,
890                     ctor_args = ctor_args
891                 ) )
892             proc = server.popen_python(python_code,
893                         communication = communication,
894                         host = host,
895                         port = port, 
896                         user = user, 
897                         agent = agent,
898                         ident_key = ident_key, 
899                         sudo = sudo,
900                         environment_setup = environment_setup) 
901             # Wait for the server to be ready, otherwise nobody
902             # will be able to connect to it
903             err = []
904             helo = "nope"
905             while helo:
906                 helo = proc.stderr.readline()
907                 if helo == 'SERVER_READY.\n':
908                     break
909                 err.append(helo)
910             else:
911                 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
912         # connect client to server
913         self._client = server.Client(root_dir, 
914                 communication = communication,
915                 host = host, 
916                 port = port, 
917                 user = user, 
918                 agent = agent, 
919                 sudo = sudo,
920                 environment_setup = environment_setup)
921     
922     @staticmethod
923     def _make_message(argtypes, argencoders, command, methname, classname, *args):
924         if len(argtypes) != len(argencoders):
925             raise ValueError, "Invalid arguments for _make_message: "\
926                 "in stub method %s of class %s "\
927                 "argtypes and argencoders must match in size" % (
928                     methname, classname )
929         if len(argtypes) != len(args):
930             raise ValueError, "Invalid arguments for _make_message: "\
931                 "in stub method %s of class %s "\
932                 "expected %d arguments, got %d" % (
933                     methname, classname,
934                     len(argtypes), len(args))
935         
936         buf = []
937         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
938             try:
939                 buf.append(fmt % encode(val))
940             except:
941                 import traceback
942                 raise TypeError, "Argument %d of stub method %s of class %s "\
943                     "requires a value of type %s, but got %s - nested error: %s" % (
944                         argnum, methname, classname,
945                         getattr(typ, '__name__', typ), type(val),
946                         traceback.format_exc()
947                 )
948         
949         return "%d|%s" % (command, '|'.join(buf))
950     
951     @staticmethod
952     def _parse_reply(rvtype, methname, classname, reply):
953         if not reply:
954             raise RuntimeError, "Invalid reply: %r "\
955                 "for stub method %s of class %s" % (
956                     reply,
957                     methname,
958                     classname)
959         
960         try:
961             result = reply.split("|")
962             code = int(result[0])
963             text = result[1]
964         except:
965             import traceback
966             raise TypeError, "Return value of stub method %s of class %s "\
967                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
968                     methname, classname,
969                     getattr(rvtype, '__name__', rvtype), reply,
970                     traceback.format_exc()
971             )
972         if code == ERROR:
973             text = base64.b64decode(text)
974             raise RuntimeError(text)
975         elif code == OK:
976             try:
977                 if rvtype is None:
978                     return
979                 else:
980                     return rvtype(text)
981             except:
982                 import traceback
983                 raise TypeError, "Return value of stub method %s of class %s "\
984                     "cannot be parsed: must be of type %s - nested error: %s" % (
985                         methname, classname,
986                         getattr(rvtype, '__name__', rvtype),
987                         traceback.format_exc()
988                 )
989         else:
990             raise RuntimeError, "Invalid reply: %r "\
991                 "for stub method %s of class %s - unknown code" % (
992                     reply,
993                     methname,
994                     classname)
995     
996     @staticmethod
997     def _make_stubs(server_class, template_class):
998         """
999         Returns a dictionary method_name -> method
1000         with stub methods.
1001         
1002         Usage:
1003         
1004             class SomeProxy(BaseProxy):
1005                ...
1006                
1007                locals().update( BaseProxy._make_stubs(
1008                     ServerClass,
1009                     TemplateClass
1010                ) )
1011         
1012         ServerClass is the corresponding Server class, as
1013         specified in the _ServerClass class method (_make_stubs
1014         is static and can't access the method), and TemplateClass
1015         is the ultimate implementation class behind the server,
1016         from which argument names and defaults are taken, to
1017         maintain meaningful interfaces.
1018         """
1019         rv = {}
1020         
1021         class NONE: pass
1022         
1023         import os.path
1024         func_template_path = os.path.join(
1025             os.path.dirname(__file__),
1026             'proxy_stub.tpl')
1027         func_template_file = open(func_template_path, "r")
1028         func_template = func_template_file.read()
1029         func_template_file.close()
1030         
1031         for methname in vars(template_class).copy():
1032             if methname.endswith('_deferred'):
1033                 # cannot wrap deferreds...
1034                 continue
1035             dmethname = methname+'_deferred'
1036             if hasattr(server_class, methname) and not methname.startswith('_'):
1037                 template_meth = getattr(template_class, methname)
1038                 server_meth = getattr(server_class, methname)
1039                 
1040                 command = getattr(server_meth, '_handles_command', None)
1041                 argtypes = getattr(server_meth, '_argtypes', None)
1042                 argencoders = getattr(server_meth, '_argencoders', None)
1043                 rvtype = getattr(server_meth, '_retval', None)
1044                 doprop = False
1045                 
1046                 if hasattr(template_meth, 'fget'):
1047                     # property getter
1048                     template_meth = template_meth.fget
1049                     doprop = True
1050                 
1051                 if command is not None and argtypes is not None and argencoders is not None:
1052                     # We have an interface method...
1053                     code = template_meth.func_code
1054                     argnames = code.co_varnames[:code.co_argcount]
1055                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1056                                   + (template_meth.func_defaults or ()) )
1057                     
1058                     func_globals = dict(
1059                         BaseProxy = BaseProxy,
1060                         argtypes = argtypes,
1061                         argencoders = argencoders,
1062                         rvtype = rvtype,
1063                         functools = functools,
1064                     )
1065                     context = dict()
1066                     
1067                     func_text = func_template % dict(
1068                         self = argnames[0],
1069                         args = '%s' % (','.join(argnames[1:])),
1070                         argdefs = ','.join([
1071                             argname if argdef is NONE
1072                             else "%s=%r" % (argname, argdef)
1073                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
1074                         ]),
1075                         command = command,
1076                         methname = methname,
1077                         classname = server_class.__name__
1078                     )
1079                     
1080                     func_text = compile(
1081                         func_text,
1082                         func_template_path,
1083                         'exec')
1084                     
1085                     exec func_text in func_globals, context
1086                     
1087                     if doprop:
1088                         rv[methname] = property(context[methname])
1089                         rv[dmethname] = property(context[dmethname])
1090                     else:
1091                         rv[methname] = context[methname]
1092                         rv[dmethname] = context[dmethname]
1093                     
1094                     # inject _deferred into core classes
1095                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1096                         def freezename(methname, dmethname):
1097                             def dmeth(self, *p, **kw): 
1098                                 return getattr(self, methname)(*p, **kw)
1099                             dmeth.__name__ = dmethname
1100                             return dmeth
1101                         dmeth = freezename(methname, dmethname)
1102                         setattr(template_class, dmethname, dmeth)
1103         
1104         return rv
1105                         
1106 class TestbedControllerProxy(BaseProxy):
1107     
1108     _ServerClass = TestbedControllerServer
1109     
1110     def __init__(self, root_dir, log_level, 
1111             testbed_id = None, 
1112             testbed_version = None, 
1113             launch = True, 
1114             communication = DC.ACCESS_LOCAL,
1115             host = None, 
1116             port = None, 
1117             user = None, 
1118             ident_key = None, 
1119             agent = None,
1120             sudo = False, 
1121             environment_setup = "", 
1122             clean_root = False):
1123         if launch and (testbed_id == None or testbed_version == None):
1124             raise RuntimeError("To launch a TesbedControllerServer a "
1125                     "testbed_id and testbed_version are required")
1126         super(TestbedControllerProxy,self).__init__(
1127             ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1128                 environment_setup, clean_root),
1129             root_dir = root_dir,
1130             launch = launch,
1131             communication = communication,
1132             host = host, 
1133             port = port, 
1134             user = user,
1135             ident_key = ident_key, 
1136             agent = agent, 
1137             sudo = sudo, 
1138             environment_setup = environment_setup)
1139
1140     locals().update( BaseProxy._make_stubs(
1141         server_class = TestbedControllerServer,
1142         template_class = nepi.core.execute.TestbedController,
1143     ) )
1144     
1145     # Shutdown stops the serverside...
1146     def shutdown(self, _stub = shutdown):
1147         rv = _stub(self)
1148         self._client.send_stop()
1149         self._client.read_reply() # wait for it
1150         return rv
1151     
1152
1153 class ExperimentControllerProxy(BaseProxy):
1154     _ServerClass = ExperimentControllerServer
1155     
1156     def __init__(self, root_dir, log_level, 
1157             experiment_xml = None, 
1158             launch = True, 
1159             communication = DC.ACCESS_LOCAL,
1160             host = None, 
1161             port = None, 
1162             user = None, 
1163             ident_key = None, 
1164             agent = None, 
1165             sudo = False, 
1166             environment_setup = "",
1167             clean_root = False):
1168         super(ExperimentControllerProxy,self).__init__(
1169             ctor_args = (root_dir, log_level, experiment_xml, environment_setup, 
1170                 clean_root),
1171             root_dir = root_dir,
1172             launch = launch, 
1173             communication = communication,
1174             host = host, 
1175             port = port, 
1176             user = user,
1177             ident_key = ident_key, 
1178             agent = agent, 
1179             sudo = sudo, 
1180             environment_setup = environment_setup,
1181             clean_root = clean_root)
1182
1183     locals().update( BaseProxy._make_stubs(
1184         server_class = ExperimentControllerServer,
1185         template_class = nepi.core.execute.ExperimentController,
1186     ) )
1187
1188     
1189     # Shutdown stops the serverside...
1190     def shutdown(self, _stub = shutdown):
1191         rv = _stub(self)
1192         self._client.send_stop()
1193         self._client.read_reply() # wait for it
1194         return rv
1195