9545a5ca20ee034853e1db98dfd7b61b531a3cb4
[nepi.git] / src / nepi / util / proxy.py
1 # -*- coding: utf-8 -*-
2
3 import base64
4 import nepi.core.execute
5 import nepi.util.environ
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
9 import getpass
10 import cPickle
11 import sys
12 import time
13 import tempfile
14 import shutil
15 import functools
16 import os
17
18 # PROTOCOL REPLIES
19 OK = 0
20 ERROR = 1
21
22 # PROTOCOL INSTRUCTION MESSAGES
23 XML = 2 
24 TRACE   = 4
25 FINISHED    = 5
26 START   = 6
27 STOP    = 7
28 SHUTDOWN    = 8
29 CONFIGURE   = 9
30 CREATE      = 10
31 CREATE_SET  = 11
32 FACTORY_SET = 12
33 CONNECT     = 13
34 CROSS_CONNECT   = 14
35 ADD_TRACE   = 15
36 ADD_ADDRESS = 16
37 ADD_ROUTE   = 17
38 DO_SETUP    = 18
39 DO_CREATE   = 19
40 DO_CONNECT_INIT = 20
41 DO_CONFIGURE    = 21
42 DO_CROSS_CONNECT_INIT   = 22
43 GET = 23
44 SET = 24
45 ACTION  = 25
46 STATUS  = 26
47 GUIDS  = 27
48 GET_ROUTE = 28
49 GET_ADDRESS = 29
50 RECOVER = 30
51 DO_PRECONFIGURE     = 31
52 GET_ATTRIBUTE_LIST  = 32
53 DO_CONNECT_COMPL    = 33
54 DO_CROSS_CONNECT_COMPL  = 34
55 TESTBED_ID  = 35
56 TESTBED_VERSION  = 36
57 DO_PRESTART = 37
58 GET_FACTORY_ID = 38
59 GET_TESTBED_ID = 39
60 GET_TESTBED_VERSION = 40
61 TRACES_INFO = 41
62 EXEC_XML = 42
63 TESTBED_STATUS  = 43
64 STARTED_TIME  = 44
65 STOPPED_TIME  = 45
66 CURRENT = 46
67 ACCESS_CONFIGURATIONS = 47
68 CURRENT_ACCESS_CONFIG = 48
69
70
71 instruction_text = dict({
72     OK:     "OK",
73     ERROR:  "ERROR",
74     XML:    "XML",
75     EXEC_XML:    "EXEC_XML",
76     TRACE:  "TRACE",
77     FINISHED:   "FINISHED",
78     START:  "START",
79     STOP:   "STOP",
80     RECOVER: "RECOVER",
81     SHUTDOWN:   "SHUTDOWN",
82     CONFIGURE:  "CONFIGURE",
83     CREATE: "CREATE",
84     CREATE_SET: "CREATE_SET",
85     FACTORY_SET:    "FACTORY_SET",
86     CONNECT:    "CONNECT",
87     CROSS_CONNECT: "CROSS_CONNECT",
88     ADD_TRACE:  "ADD_TRACE",
89     ADD_ADDRESS:    "ADD_ADDRESS",
90     ADD_ROUTE:  "ADD_ROUTE",
91     DO_SETUP:   "DO_SETUP",
92     DO_CREATE:  "DO_CREATE",
93     DO_CONNECT_INIT: "DO_CONNECT_INIT",
94     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
95     DO_CONFIGURE:   "DO_CONFIGURE",
96     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
97     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
98     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
99     GET:    "GET",
100     SET:    "SET",
101     GET_ROUTE: "GET_ROUTE",
102     GET_ADDRESS: "GET_ADDRESS",
103     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
104     GET_FACTORY_ID: "GET_FACTORY_ID",
105     GET_TESTBED_ID: "GET_TESTBED_ID",
106     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
107     ACTION: "ACTION",
108     STATUS: "STATUS",
109     GUIDS:  "GUIDS",
110     TESTBED_ID: "TESTBED_ID",
111     TESTBED_VERSION: "TESTBED_VERSION",
112     TRACES_INFO: "TRACES_INFO",
113     STARTED_TIME: "STARTED_TIME",
114     STOPPED_TIME: "STOPPED_TIME",
115     CURRENT: "CURRENT",
116     ACCESS_CONFIGURATIONS: "ACCESS_CONFIGURATIONS",
117     CURRENT_ACCESS_CONFIG: "CURRENT_ACCESS_CONFIG"
118
119     })
120
121 def log_msg(server, params):
122     try:
123         instr = int(params[0])
124         instr_txt = instruction_text[instr]
125         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
126             instr_txt, ", ".join(map(str, params[1:]))))
127     except:
128         # don't die for logging
129         pass
130
131 def log_reply(server, reply):
132     try:
133         res = reply.split("|")
134         code = int(res[0])
135         code_txt = instruction_text[code]
136         try:
137             txt = base64.b64decode(res[1])
138         except:
139             txt = res[1]
140         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
141                 code_txt, txt))
142     except:
143         # don't die for logging
144         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
145                 reply))
146         pass
147
148 def to_server_log_level(log_level):
149     return (
150         DC.DEBUG_LEVEL
151             if log_level == DC.DEBUG_LEVEL 
152         else DC.ERROR_LEVEL
153     )
154
155 def get_access_config_params(access_config):
156     mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
157     launch = not access_config.get_attribute_value(DC.RECOVER)
158     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
159     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
160     log_level = to_server_log_level(log_level)
161     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
162     environment_setup = (
163         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
164         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
165         else ""
166     )
167     user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
168     host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
169     port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
170     agent = access_config.get_attribute_value(DC.USE_AGENT)
171     sudo = access_config.get_attribute_value(DC.USE_SUDO)
172     key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
173     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
174     clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
175     return (mode, launch, root_dir, log_level, communication, user, host, port,
176             key, agent, sudo, environment_setup, clean_root)
177
178 class AccessConfiguration(AttributesMap):
179     def __init__(self, params = None):
180         super(AccessConfiguration, self).__init__()
181         
182         from nepi.core.metadata import Metadata
183         
184         for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
185             self.add_attribute(**attr_info)
186
187         if params:
188             for attr_name, attr_value in params.iteritems():
189                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
190                 attr_value = parser(attr_value)
191                 self.set_attribute_value(attr_name, attr_value)
192
193 class TempDir(object):
194     def __init__(self):
195         self.path = tempfile.mkdtemp()
196     
197     def __del__(self):
198         shutil.rmtree(self.path)
199
200 class PermDir(object):
201     def __init__(self, path):
202         self.path = path
203
204 def create_experiment_suite(xml, access_config, repetitions = None,
205         duration = None, wait_guids = None):
206     mode = None
207     if access_config :
208         (mode, launch, root_dir, log_level, communication, user, host, port, 
209                 key, agent, sudo, environment_setup, clean_root) \
210                         = get_access_config_params(access_config)
211
212     if not mode or mode == DC.MODE_SINGLE_PROCESS:
213         from nepi.core.execute import ExperimentSuite
214         if not root_dir:
215             root_dir = TempDir()
216         else:
217             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
218
219         exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
220                 wait_guids)
221         
222         # inject reference to temporary dir, so that it gets cleaned
223         # up at destruction time.
224         exp_suite._tempdir = root_dir
225         return exp_suite
226     elif mode == DC.MODE_DAEMON:
227         return ExperimentSuiteProxy(root_dir, log_level,
228                 xml,
229                 repetitions = repetitions, 
230                 duration = duration,
231                 wait_guids = wait_guids, 
232                 communication = communication,
233                 host = host, 
234                 port = port, 
235                 user = user, 
236                 ident_key = key,
237                 agent = agent, 
238                 sudo = sudo, 
239                 environment_setup = environment_setup, 
240                 clean_root = clean_root)
241     raise RuntimeError("Unsupported access configuration '%s'" % mode)
242
243 def create_experiment_controller(xml, access_config = None):
244     mode = None
245     launch = True
246     log_level = DC.ERROR_LEVEL
247     if access_config:
248         (mode, launch, root_dir, log_level, communication, user, host, port, 
249                 key, agent, sudo, environment_setup, clean_root) \
250                         = get_access_config_params(access_config)
251
252     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
253
254     if not mode or mode == DC.MODE_SINGLE_PROCESS:
255         from nepi.core.execute import ExperimentController
256         
257         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
258             root_dir = TempDir()
259         else:
260             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
261         controller = ExperimentController(xml, root_dir.path)
262         
263         # inject reference to temporary dir, so that it gets cleaned
264         # up at destruction time.
265         controller._tempdir = root_dir
266         
267         if not launch:
268             # try to recover
269             controller.recover()
270         
271         return controller
272     elif mode == DC.MODE_DAEMON:
273         try:
274             return ExperimentControllerProxy(root_dir, log_level,
275                 experiment_xml = xml,
276                 communication = communication,
277                 host = host, 
278                 port = port, 
279                 user = user, 
280                 ident_key = key,
281                 agent = agent, 
282                 sudo = sudo, 
283                 launch = launch,
284                 environment_setup = environment_setup, 
285                 clean_root = clean_root)
286         except:
287             if not launch:
288                 # Maybe controller died, recover from persisted testbed information if possible
289                 controller = ExperimentControllerProxy(root_dir, log_level,
290                     experiment_xml = xml,
291                     communication = communication,
292                     host = host, 
293                     port = port, 
294                     user = user, 
295                     ident_key = key,
296                     agent = agent, 
297                     sudo = sudo, 
298                     launch = True,
299                     environment_setup = environment_setup,
300                     clean_root = clean_root)
301                 controller.recover()
302                 return controller
303             else:
304                 raise
305     raise RuntimeError("Unsupported access configuration '%s'" % mode)
306
307 def create_testbed_controller(testbed_id, testbed_version, access_config):
308     mode = None
309     launch = True
310     log_level = DC.ERROR_LEVEL
311     if access_config:
312         (mode, launch, root_dir, log_level, communication, user, host, port, 
313                 key, agent, sudo, environment_setup, clean_root) \
314                         = get_access_config_params(access_config)
315
316     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
317     
318     if not mode or mode == DC.MODE_SINGLE_PROCESS:
319         if not launch:
320             raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
321         return  _build_testbed_controller(testbed_id, testbed_version)
322     elif mode == DC.MODE_DAEMON:
323         return TestbedControllerProxy(root_dir, log_level, 
324                 testbed_id = testbed_id, 
325                 testbed_version = testbed_version,
326                 communication = communication,
327                 host = host, 
328                 port = port, 
329                 ident_key = key,
330                 user = user, 
331                 agent = agent, 
332                 sudo = sudo, 
333                 launch = launch,
334                 environment_setup = environment_setup, 
335                 clean_root = clean_root)
336     raise RuntimeError("Unsupported access configuration '%s'" % mode)
337
338 def _build_testbed_controller(testbed_id, testbed_version):
339     mod_name = nepi.util.environ.find_testbed(testbed_id)
340     
341     if not mod_name in sys.modules:
342         try:
343             __import__(mod_name)
344         except ImportError:
345             raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
346     
347     module = sys.modules[mod_name]
348     tc = module.TestbedController()
349     if tc.testbed_version != testbed_version:
350         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
351                 (testbed_id, testbed_version, tc.testbed_version))
352     return tc
353
354 # Just a namespace class
355 class Marshalling:
356     class Decoders:
357         @staticmethod
358         def pickled_data(sdata):
359             return cPickle.loads(base64.b64decode(sdata))
360         
361         @staticmethod
362         def base64_data(sdata):
363             return base64.b64decode(sdata)
364         
365         @staticmethod
366         def nullint(sdata):
367             return None if sdata == "None" else int(sdata)
368
369         @staticmethod
370         def bool(sdata):
371             return sdata == 'True'
372         
373     class Encoders:
374         @staticmethod
375         def pickled_data(data):
376             return base64.b64encode(cPickle.dumps(data))
377         
378         @staticmethod
379         def base64_data(data):
380             if not data:
381                 return ""
382             return base64.b64encode(data)
383         
384         @staticmethod
385         def nullint(data):
386             return "None" if data is None else int(data)
387         
388         @staticmethod
389         def bool(data):
390             return str(bool(data))
391            
392     # import into Marshalling all the decoders
393     # they act as types
394     locals().update([
395         (typname, typ)
396         for typname, typ in vars(Decoders).iteritems()
397         if not typname.startswith('_')
398     ])
399
400     _TYPE_ENCODERS = dict([
401         # id(type) -> (<encoding_function>, <formatting_string>)
402         (typname, (getattr(Encoders,typname),"%s"))
403         for typname in vars(Decoders)
404         if not typname.startswith('_')
405            and hasattr(Encoders,typname)
406     ])
407
408     # Builtins
409     _TYPE_ENCODERS["float"] = (float, "%r")
410     _TYPE_ENCODERS["int"] = (int, "%d")
411     _TYPE_ENCODERS["long"] = (int, "%d")
412     _TYPE_ENCODERS["str"] = (str, "%s")
413     _TYPE_ENCODERS["unicode"] = (str, "%s")
414     
415     # Generic encoder
416     _TYPE_ENCODERS[None] = (str, "%s")
417     
418     @staticmethod
419     def args(*types):
420         """
421         Decorator that converts the given function into one that takes
422         a single "params" list, with each parameter marshalled according
423         to the given factory callable (type constructors are accepted).
424         
425         The first argument (self) is left untouched.
426         
427         eg:
428         
429         @Marshalling.args(int,int,str,base64_data)
430         def somefunc(self, someint, otherint, somestr, someb64):
431            return someretval
432         """
433         def decor(f):
434             @functools.wraps(f)
435             def rv(self, params):
436                 return f(self, *[ ctor(val)
437                                   for ctor,val in zip(types, params[1:]) ])
438             
439             rv._argtypes = types
440             
441             # Derive type encoders by looking up types in _TYPE_ENCODERS
442             # make_proxy will use it to encode arguments in command strings
443             argencoders = []
444             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
445             for typ in types:
446                 if typ.__name__ in TYPE_ENCODERS:
447                     argencoders.append(TYPE_ENCODERS[typ.__name__])
448                 else:
449                     # generic encoder
450                     argencoders.append(TYPE_ENCODERS[None])
451             
452             rv._argencoders = tuple(argencoders)
453             
454             rv._retval = getattr(f, '_retval', None)
455             return rv
456         return decor
457
458     @staticmethod
459     def retval(typ=Decoders.base64_data):
460         """
461         Decorator that converts the given function into one that 
462         returns a properly encoded return string, given that the undecorated
463         function returns suitable input for the encoding function.
464         
465         The optional typ argument specifies a type.
466         For the default of base64_data, return values should be strings.
467         The return value of the encoding method should be a string always.
468         
469         eg:
470         
471         @Marshalling.args(int,int,str,base64_data)
472         @Marshalling.retval(str)
473         def somefunc(self, someint, otherint, somestr, someb64):
474            return someint
475         """
476         encode, fmt = Marshalling._TYPE_ENCODERS.get(
477             typ.__name__,
478             Marshalling._TYPE_ENCODERS[None])
479         fmt = "%d|"+fmt
480         
481         def decor(f):
482             @functools.wraps(f)
483             def rv(self, *p, **kw):
484                 data = f(self, *p, **kw)
485                 return fmt % (
486                     OK,
487                     encode(data)
488                 )
489             rv._retval = typ
490             rv._argtypes = getattr(f, '_argtypes', None)
491             rv._argencoders = getattr(f, '_argencoders', None)
492             return rv
493         return decor
494     
495     @staticmethod
496     def retvoid(f):
497         """
498         Decorator that converts the given function into one that 
499         always return an encoded empty string.
500         
501         Useful for null-returning functions.
502         """
503         OKRV = "%d|" % (OK,)
504         
505         @functools.wraps(f)
506         def rv(self, *p, **kw):
507             f(self, *p, **kw)
508             return OKRV
509         
510         rv._retval = None
511         rv._argtypes = getattr(f, '_argtypes', None)
512         rv._argencoders = getattr(f, '_argencoders', None)
513         return rv
514     
515     @staticmethod
516     def handles(whichcommand):
517         """
518         Associates the method with a given command code for servers.
519         It should always be the topmost decorator.
520         """
521         def decor(f):
522             f._handles_command = whichcommand
523             return f
524         return decor
525
526 class BaseServer(server.Server):
527     def reply_action(self, msg):
528         if not msg:
529             result = base64.b64encode("Invalid command line")
530             reply = "%d|%s" % (ERROR, result)
531         else:
532             params = msg.split("|")
533             instruction = int(params[0])
534             log_msg(self, params)
535             try:
536                 for mname,meth in vars(self.__class__).iteritems():
537                     if not mname.startswith('_'):
538                         cmd = getattr(meth, '_handles_command', None)
539                         if cmd == instruction:
540                             meth = getattr(self, mname)
541                             reply = meth(params)
542                             break
543                 else:
544                     error = "Invalid instruction %s" % instruction
545                     self.log_error(error)
546                     result = base64.b64encode(error)
547                     reply = "%d|%s" % (ERROR, result)
548             except:
549                 error = self.log_error()
550                 result = base64.b64encode(error)
551                 reply = "%d|%s" % (ERROR, result)
552         log_reply(self, reply)
553         return reply
554
555 class ExperimentSuiteServer(BaseServer):
556     def __init__(self, root_dir, log_level, 
557             xml, repetitions, duration, wait_guids, 
558             communication = DC.ACCESS_LOCAL,
559             host = None, 
560             port = None, 
561             user = None, 
562             ident_key = None, 
563             agent = None,
564             sudo = False, 
565             environment_setup = "", 
566             clean_root = False):
567         super(ExperimentSuiteServer, self).__init__(root_dir, log_level, 
568             environment_setup = environment_setup, clean_root = clean_root)
569         access_config = AccessConfiguration()
570         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
571         access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
572         access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
573         if user:
574             access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
575         if host:
576             access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
577         if port:
578             access_config.set_attribute_value(DC.DEPLOYMENT_PORT, port)
579         if agent:    
580             access_config.set_attribute_value(DC.USE_AGENT, agent)
581         if sudo:
582             acess_config.set_attribute_value(DC.USE_SUDO, sudo)
583         if ident_key:
584             access_config.set_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
585         if communication:
586             access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
587         if clean_root:
588             access_config.set_attribute_value(DC.CLEAN_ROOT, clean_root)
589         self._experiment_xml = xml
590         self._duration = duration
591         self._repetitions = repetitions
592         self._wait_guids = wait_guids
593         self._access_config = access_config
594         self._experiment_suite = None
595
596     def post_daemonize(self):
597         from nepi.core.execute import ExperimentSuite
598         self._experiment_suite = ExperimentSuite(
599                 self._experiment_xml, self._access_config, 
600                 self._repetitions, self._duration, self._wait_guids)
601
602     @Marshalling.handles(CURRENT)
603     @Marshalling.args()
604     @Marshalling.retval(int)
605     def current(self):
606         return self._experiment_suite.current()
607    
608     @Marshalling.handles(STATUS)
609     @Marshalling.args()
610     @Marshalling.retval(int)
611     def status(self):
612         return self._experiment_suite.status()
613     
614     @Marshalling.handles(FINISHED)
615     @Marshalling.args()
616     @Marshalling.retval(Marshalling.bool)
617     def is_finished(self):
618         return self._experiment_suite.is_finished()
619
620     @Marshalling.handles(ACCESS_CONFIGURATIONS)
621     @Marshalling.args()
622     @Marshalling.retval( Marshalling.pickled_data )
623     def get_access_configurations(self):
624         return self._experiment_suite.get_access_configurations()
625
626     @Marshalling.handles(START)
627     @Marshalling.args()
628     @Marshalling.retvoid
629     def start(self):
630         self._experiment_suite.start()
631
632     @Marshalling.handles(SHUTDOWN)
633     @Marshalling.args()
634     @Marshalling.retvoid
635     def shutdown(self):
636         self._experiment_suite.shutdown()
637
638     @Marshalling.handles(CURRENT_ACCESS_CONFIG)
639     @Marshalling.args()
640     @Marshalling.retval( Marshalling.pickled_data )
641     def get_current_access_config(self):
642         return self._experiment_suite.get_current_access_config()
643
644 class TestbedControllerServer(BaseServer):
645     def __init__(self, root_dir, log_level, testbed_id, testbed_version, 
646             environment_setup, clean_root):
647         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
648             environment_setup = environment_setup, clean_root = clean_root)
649         self._testbed_id = testbed_id
650         self._testbed_version = testbed_version
651         self._testbed = None
652
653     def post_daemonize(self):
654         self._testbed = _build_testbed_controller(self._testbed_id, 
655                 self._testbed_version)
656
657     @Marshalling.handles(GUIDS)
658     @Marshalling.args()
659     @Marshalling.retval( Marshalling.pickled_data )
660     def guids(self):
661         return self._testbed.guids
662
663     @Marshalling.handles(TESTBED_ID)
664     @Marshalling.args()
665     @Marshalling.retval()
666     def testbed_id(self):
667         return str(self._testbed.testbed_id)
668
669     @Marshalling.handles(TESTBED_VERSION)
670     @Marshalling.args()
671     @Marshalling.retval()
672     def testbed_version(self):
673         return str(self._testbed.testbed_version)
674
675     @Marshalling.handles(CREATE)
676     @Marshalling.args(int, str)
677     @Marshalling.retvoid
678     def defer_create(self, guid, factory_id):
679         self._testbed.defer_create(guid, factory_id)
680
681     @Marshalling.handles(TRACE)
682     @Marshalling.args(int, str, Marshalling.base64_data)
683     @Marshalling.retval()
684     def trace(self, guid, trace_id, attribute):
685         return self._testbed.trace(guid, trace_id, attribute)
686
687     @Marshalling.handles(TRACES_INFO)
688     @Marshalling.args()
689     @Marshalling.retval( Marshalling.pickled_data )
690     def traces_info(self):
691         return self._testbed.traces_info()
692
693     @Marshalling.handles(START)
694     @Marshalling.args()
695     @Marshalling.retvoid
696     def start(self):
697         self._testbed.start()
698
699     @Marshalling.handles(STOP)
700     @Marshalling.args()
701     @Marshalling.retvoid
702     def stop(self):
703         self._testbed.stop()
704
705     @Marshalling.handles(SHUTDOWN)
706     @Marshalling.args()
707     @Marshalling.retvoid
708     def shutdown(self):
709         self._testbed.shutdown()
710
711     @Marshalling.handles(CONFIGURE)
712     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
713     @Marshalling.retvoid
714     def defer_configure(self, name, value):
715         self._testbed.defer_configure(name, value)
716
717     @Marshalling.handles(CREATE_SET)
718     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
719     @Marshalling.retvoid
720     def defer_create_set(self, guid, name, value):
721         self._testbed.defer_create_set(guid, name, value)
722
723     @Marshalling.handles(FACTORY_SET)
724     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
725     @Marshalling.retvoid
726     def defer_factory_set(self, name, value):
727         self._testbed.defer_factory_set(name, value)
728
729     @Marshalling.handles(CONNECT)
730     @Marshalling.args(int, str, int, str)
731     @Marshalling.retvoid
732     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
733         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
734             connector_type_name2)
735
736     @Marshalling.handles(CROSS_CONNECT)
737     @Marshalling.args(int, str, int, int, str, str, str)
738     @Marshalling.retvoid
739     def defer_cross_connect(self, 
740             guid, connector_type_name,
741             cross_guid, cross_testbed_guid,
742             cross_testbed_id, cross_factory_id,
743             cross_connector_type_name):
744         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
745             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
746             cross_connector_type_name)
747
748     @Marshalling.handles(ADD_TRACE)
749     @Marshalling.args(int, str)
750     @Marshalling.retvoid
751     def defer_add_trace(self, guid, trace_id):
752         self._testbed.defer_add_trace(guid, trace_id)
753
754     @Marshalling.handles(ADD_ADDRESS)
755     @Marshalling.args(int, str, int, Marshalling.pickled_data)
756     @Marshalling.retvoid
757     def defer_add_address(self, guid, address, netprefix, broadcast):
758         self._testbed.defer_add_address(guid, address, netprefix,
759                 broadcast)
760
761     @Marshalling.handles(ADD_ROUTE)
762     @Marshalling.args(int, str, int, str, int)
763     @Marshalling.retvoid
764     def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
765         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
766
767     @Marshalling.handles(DO_SETUP)
768     @Marshalling.args()
769     @Marshalling.retvoid
770     def do_setup(self):
771         self._testbed.do_setup()
772
773     @Marshalling.handles(DO_CREATE)
774     @Marshalling.args()
775     @Marshalling.retvoid
776     def do_create(self):
777         self._testbed.do_create()
778
779     @Marshalling.handles(DO_CONNECT_INIT)
780     @Marshalling.args()
781     @Marshalling.retvoid
782     def do_connect_init(self):
783         self._testbed.do_connect_init()
784
785     @Marshalling.handles(DO_CONNECT_COMPL)
786     @Marshalling.args()
787     @Marshalling.retvoid
788     def do_connect_compl(self):
789         self._testbed.do_connect_compl()
790
791     @Marshalling.handles(DO_CONFIGURE)
792     @Marshalling.args()
793     @Marshalling.retvoid
794     def do_configure(self):
795         self._testbed.do_configure()
796
797     @Marshalling.handles(DO_PRECONFIGURE)
798     @Marshalling.args()
799     @Marshalling.retvoid
800     def do_preconfigure(self):
801         self._testbed.do_preconfigure()
802
803     @Marshalling.handles(DO_PRESTART)
804     @Marshalling.args()
805     @Marshalling.retvoid
806     def do_prestart(self):
807         self._testbed.do_prestart()
808
809     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
810     @Marshalling.args( Marshalling.Decoders.pickled_data )
811     @Marshalling.retvoid
812     def do_cross_connect_init(self, cross_data):
813         self._testbed.do_cross_connect_init(cross_data)
814
815     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
816     @Marshalling.args( Marshalling.Decoders.pickled_data )
817     @Marshalling.retvoid
818     def do_cross_connect_compl(self, cross_data):
819         self._testbed.do_cross_connect_compl(cross_data)
820
821     @Marshalling.handles(GET)
822     @Marshalling.args(int, Marshalling.base64_data, str)
823     @Marshalling.retval( Marshalling.pickled_data )
824     def get(self, guid, name, time):
825         return self._testbed.get(guid, name, time)
826
827     @Marshalling.handles(SET)
828     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
829     @Marshalling.retvoid
830     def set(self, guid, name, value, time):
831         self._testbed.set(guid, name, value, time)
832
833     @Marshalling.handles(GET_ADDRESS)
834     @Marshalling.args(int, int, Marshalling.base64_data)
835     @Marshalling.retval()
836     def get_address(self, guid, index, attribute):
837         return str(self._testbed.get_address(guid, index, attribute))
838
839     @Marshalling.handles(GET_ROUTE)
840     @Marshalling.args(int, int, Marshalling.base64_data)
841     @Marshalling.retval()
842     def get_route(self, guid, index, attribute):
843         return str(self._testbed.get_route(guid, index, attribute))
844
845     @Marshalling.handles(ACTION)
846     @Marshalling.args(str, int, Marshalling.base64_data)
847     @Marshalling.retvoid
848     def action(self, time, guid, command):
849         self._testbed.action(time, guid, command)
850
851     @Marshalling.handles(STATUS)
852     @Marshalling.args(Marshalling.nullint)
853     @Marshalling.retval(int)
854     def status(self, guid):
855         return self._testbed.status(guid)
856
857     @Marshalling.handles(TESTBED_STATUS)
858     @Marshalling.args()
859     @Marshalling.retval(int)
860     def testbed_status(self):
861         return self._testbed.testbed_status()
862
863     @Marshalling.handles(GET_ATTRIBUTE_LIST)
864     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
865     @Marshalling.retval( Marshalling.pickled_data )
866     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
867         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
868
869     @Marshalling.handles(GET_FACTORY_ID)
870     @Marshalling.args(int)
871     @Marshalling.retval()
872     def get_factory_id(self, guid):
873         return self._testbed.get_factory_id(guid)
874
875     @Marshalling.handles(RECOVER)
876     @Marshalling.args()
877     @Marshalling.retvoid
878     def recover(self):
879         self._testbed.recover()
880
881
882 class ExperimentControllerServer(BaseServer):
883     def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
884             clean_root):
885         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
886             environment_setup = environment_setup, clean_root = clean_root)
887         self._experiment_xml = experiment_xml
888         self._experiment = None
889
890     def post_daemonize(self):
891         from nepi.core.execute import ExperimentController
892         self._experiment = ExperimentController(self._experiment_xml, 
893             root_dir = self._root_dir)
894
895     @Marshalling.handles(GUIDS)
896     @Marshalling.args()
897     @Marshalling.retval( Marshalling.pickled_data )
898     def guids(self):
899         return self._experiment.guids
900
901     @Marshalling.handles(STARTED_TIME)
902     @Marshalling.args()
903     @Marshalling.retval( Marshalling.pickled_data )
904     def started_time(self):
905         return self._experiment.started_time
906
907     @Marshalling.handles(STOPPED_TIME)
908     @Marshalling.args()
909     @Marshalling.retval( Marshalling.pickled_data )
910     def stopped_time(self):
911         return self._experiment.stopped_time
912
913     @Marshalling.handles(XML)
914     @Marshalling.args()
915     @Marshalling.retval()
916     def experiment_design_xml(self):
917         return self._experiment.experiment_design_xml
918         
919     @Marshalling.handles(EXEC_XML)
920     @Marshalling.args()
921     @Marshalling.retval()
922     def experiment_execute_xml(self):
923         return self._experiment.experiment_execute_xml
924         
925     @Marshalling.handles(TRACE)
926     @Marshalling.args(int, str, Marshalling.base64_data)
927     @Marshalling.retval()
928     def trace(self, guid, trace_id, attribute):
929         return str(self._experiment.trace(guid, trace_id, attribute))
930
931     @Marshalling.handles(TRACES_INFO)
932     @Marshalling.args()
933     @Marshalling.retval( Marshalling.pickled_data )
934     def traces_info(self):
935         return self._experiment.traces_info()
936
937     @Marshalling.handles(FINISHED)
938     @Marshalling.args(int)
939     @Marshalling.retval(Marshalling.bool)
940     def is_finished(self, guid):
941         return self._experiment.is_finished(guid)
942
943     @Marshalling.handles(STATUS)
944     @Marshalling.args(int)
945     @Marshalling.retval(int)
946     def status(self, guid):
947         return self._experiment.status(guid)
948
949     @Marshalling.handles(GET)
950     @Marshalling.args(int, Marshalling.base64_data, str)
951     @Marshalling.retval( Marshalling.pickled_data )
952     def get(self, guid, name, time):
953         return self._experiment.get(guid, name, time)
954
955     @Marshalling.handles(SET)
956     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
957     @Marshalling.retvoid
958     def set(self, guid, name, value, time):
959         self._experiment.set(guid, name, value, time)
960
961     @Marshalling.handles(START)
962     @Marshalling.args()
963     @Marshalling.retvoid
964     def start(self):
965         self._experiment.start()
966
967     @Marshalling.handles(STOP)
968     @Marshalling.args()
969     @Marshalling.retvoid
970     def stop(self):
971         self._experiment.stop()
972
973     @Marshalling.handles(RECOVER)
974     @Marshalling.args()
975     @Marshalling.retvoid
976     def recover(self):
977         self._experiment.recover()
978
979     @Marshalling.handles(SHUTDOWN)
980     @Marshalling.args()
981     @Marshalling.retvoid
982     def shutdown(self):
983         self._experiment.shutdown()
984
985     @Marshalling.handles(GET_TESTBED_ID)
986     @Marshalling.args(int)
987     @Marshalling.retval()
988     def get_testbed_id(self, guid):
989         return self._experiment.get_testbed_id(guid)
990
991     @Marshalling.handles(GET_FACTORY_ID)
992     @Marshalling.args(int)
993     @Marshalling.retval()
994     def get_factory_id(self, guid):
995         return self._experiment.get_factory_id(guid)
996
997     @Marshalling.handles(GET_TESTBED_VERSION)
998     @Marshalling.args(int)
999     @Marshalling.retval()
1000     def get_testbed_version(self, guid):
1001         return self._experiment.get_testbed_version(guid)
1002
1003 class BaseProxy(object):
1004     _ServerClass = None
1005     _ServerClassModule = "nepi.util.proxy"
1006     
1007     def __init__(self, ctor_args, root_dir, 
1008             launch = True, 
1009             communication = DC.ACCESS_LOCAL,
1010             host = None, 
1011             port = None, 
1012             user = None, 
1013             ident_key = None, 
1014             agent = None,
1015             sudo = False, 
1016             environment_setup = "",
1017             clean_root = False):
1018         if launch:
1019             python_code = (
1020                     "from %(classmodule)s import %(classname)s;"
1021                     "s = %(classname)s%(ctor_args)r;"
1022                     "s.run()" 
1023                 % dict(
1024                     classname = self._ServerClass.__name__,
1025                     classmodule = self._ServerClassModule,
1026                     ctor_args = ctor_args
1027                 ) )
1028             proc = server.popen_python(python_code,
1029                         communication = communication,
1030                         host = host,
1031                         port = port, 
1032                         user = user, 
1033                         agent = agent,
1034                         ident_key = ident_key, 
1035                         sudo = sudo,
1036                         environment_setup = environment_setup) 
1037             # Wait for the server to be ready, otherwise nobody
1038             # will be able to connect to it
1039             err = []
1040             helo = "nope"
1041             while helo:
1042                 helo = proc.stderr.readline()
1043                 if helo == 'SERVER_READY.\n':
1044                     break
1045                 err.append(helo)
1046             else:
1047                 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
1048         # connect client to server
1049         self._client = server.Client(root_dir, 
1050                 communication = communication,
1051                 host = host, 
1052                 port = port, 
1053                 user = user, 
1054                 agent = agent, 
1055                 sudo = sudo,
1056                 environment_setup = environment_setup)
1057     
1058     @staticmethod
1059     def _make_message(argtypes, argencoders, command, methname, classname, *args):
1060         if len(argtypes) != len(argencoders):
1061             raise ValueError, "Invalid arguments for _make_message: "\
1062                 "in stub method %s of class %s "\
1063                 "argtypes and argencoders must match in size" % (
1064                     methname, classname )
1065         if len(argtypes) != len(args):
1066             raise ValueError, "Invalid arguments for _make_message: "\
1067                 "in stub method %s of class %s "\
1068                 "expected %d arguments, got %d" % (
1069                     methname, classname,
1070                     len(argtypes), len(args))
1071         
1072         buf = []
1073         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
1074             try:
1075                 buf.append(fmt % encode(val))
1076             except:
1077                 import traceback
1078                 raise TypeError, "Argument %d of stub method %s of class %s "\
1079                     "requires a value of type %s, but got %s - nested error: %s" % (
1080                         argnum, methname, classname,
1081                         getattr(typ, '__name__', typ), type(val),
1082                         traceback.format_exc()
1083                 )
1084         
1085         return "%d|%s" % (command, '|'.join(buf))
1086     
1087     @staticmethod
1088     def _parse_reply(rvtype, methname, classname, reply):
1089         if not reply:
1090             raise RuntimeError, "Invalid reply: %r "\
1091                 "for stub method %s of class %s" % (
1092                     reply,
1093                     methname,
1094                     classname)
1095         
1096         try:
1097             result = reply.split("|")
1098             code = int(result[0])
1099             text = result[1]
1100         except:
1101             import traceback
1102             raise TypeError, "Return value of stub method %s of class %s "\
1103                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
1104                     methname, classname,
1105                     getattr(rvtype, '__name__', rvtype), reply,
1106                     traceback.format_exc()
1107             )
1108         if code == ERROR:
1109             text = base64.b64decode(text)
1110             raise RuntimeError(text)
1111         elif code == OK:
1112             try:
1113                 if rvtype is None:
1114                     return
1115                 else:
1116                     return rvtype(text)
1117             except:
1118                 import traceback
1119                 raise TypeError, "Return value of stub method %s of class %s "\
1120                     "cannot be parsed: must be of type %s - nested error: %s" % (
1121                         methname, classname,
1122                         getattr(rvtype, '__name__', rvtype),
1123                         traceback.format_exc()
1124                 )
1125         else:
1126             raise RuntimeError, "Invalid reply: %r "\
1127                 "for stub method %s of class %s - unknown code" % (
1128                     reply,
1129                     methname,
1130                     classname)
1131     
1132     @staticmethod
1133     def _make_stubs(server_class, template_class):
1134         """
1135         Returns a dictionary method_name -> method
1136         with stub methods.
1137         
1138         Usage:
1139         
1140             class SomeProxy(BaseProxy):
1141                ...
1142                
1143                locals().update( BaseProxy._make_stubs(
1144                     ServerClass,
1145                     TemplateClass
1146                ) )
1147         
1148         ServerClass is the corresponding Server class, as
1149         specified in the _ServerClass class method (_make_stubs
1150         is static and can't access the method), and TemplateClass
1151         is the ultimate implementation class behind the server,
1152         from which argument names and defaults are taken, to
1153         maintain meaningful interfaces.
1154         """
1155         rv = {}
1156         
1157         class NONE: pass
1158         
1159         import os.path
1160         func_template_path = os.path.join(
1161             os.path.dirname(__file__),
1162             'proxy_stub.tpl')
1163         func_template_file = open(func_template_path, "r")
1164         func_template = func_template_file.read()
1165         func_template_file.close()
1166         
1167         for methname in vars(template_class).copy():
1168             if methname.endswith('_deferred'):
1169                 # cannot wrap deferreds...
1170                 continue
1171             dmethname = methname+'_deferred'
1172             if hasattr(server_class, methname) and not methname.startswith('_'):
1173                 template_meth = getattr(template_class, methname)
1174                 server_meth = getattr(server_class, methname)
1175                 
1176                 command = getattr(server_meth, '_handles_command', None)
1177                 argtypes = getattr(server_meth, '_argtypes', None)
1178                 argencoders = getattr(server_meth, '_argencoders', None)
1179                 rvtype = getattr(server_meth, '_retval', None)
1180                 doprop = False
1181                 
1182                 if hasattr(template_meth, 'fget'):
1183                     # property getter
1184                     template_meth = template_meth.fget
1185                     doprop = True
1186                 
1187                 if command is not None and argtypes is not None and argencoders is not None:
1188                     # We have an interface method...
1189                     code = template_meth.func_code
1190                     argnames = code.co_varnames[:code.co_argcount]
1191                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1192                                   + (template_meth.func_defaults or ()) )
1193                     
1194                     func_globals = dict(
1195                         BaseProxy = BaseProxy,
1196                         argtypes = argtypes,
1197                         argencoders = argencoders,
1198                         rvtype = rvtype,
1199                         functools = functools,
1200                     )
1201                     context = dict()
1202                     
1203                     func_text = func_template % dict(
1204                         self = argnames[0],
1205                         args = '%s' % (','.join(argnames[1:])),
1206                         argdefs = ','.join([
1207                             argname if argdef is NONE
1208                             else "%s=%r" % (argname, argdef)
1209                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
1210                         ]),
1211                         command = command,
1212                         methname = methname,
1213                         classname = server_class.__name__
1214                     )
1215                     
1216                     func_text = compile(
1217                         func_text,
1218                         func_template_path,
1219                         'exec')
1220                     
1221                     exec func_text in func_globals, context
1222                     
1223                     if doprop:
1224                         rv[methname] = property(context[methname])
1225                         rv[dmethname] = property(context[dmethname])
1226                     else:
1227                         rv[methname] = context[methname]
1228                         rv[dmethname] = context[dmethname]
1229                     
1230                     # inject _deferred into core classes
1231                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1232                         def freezename(methname, dmethname):
1233                             def dmeth(self, *p, **kw): 
1234                                 return getattr(self, methname)(*p, **kw)
1235                             dmeth.__name__ = dmethname
1236                             return dmeth
1237                         dmeth = freezename(methname, dmethname)
1238                         setattr(template_class, dmethname, dmeth)
1239         
1240         return rv
1241
1242 class ExperimentSuiteProxy(BaseProxy):
1243     
1244     _ServerClass = ExperimentSuiteServer
1245     
1246     def __init__(self, root_dir, log_level,
1247             xml, repetitions, duration, wait_guids, 
1248             communication = DC.ACCESS_LOCAL,
1249             host = None, 
1250             port = None, 
1251             user = None, 
1252             ident_key = None, 
1253             agent = None,
1254             sudo = False, 
1255             environment_setup = "", 
1256             clean_root = False):
1257         super(ExperimentSuiteProxy,self).__init__(
1258             ctor_args = (root_dir, log_level,
1259                 xml, 
1260                 repetitions, 
1261                 duration,
1262                 wait_guids, 
1263                 communication,
1264                 host, 
1265                 port, 
1266                 user, 
1267                 ident_key,
1268                 agent, 
1269                 sudo, 
1270                 environment_setup, 
1271                 clean_root),
1272             root_dir = root_dir,
1273             launch = True, #launch
1274             communication = communication,
1275             host = host, 
1276             port = port, 
1277             user = user,
1278             ident_key = ident_key, 
1279             agent = agent, 
1280             sudo = sudo, 
1281             environment_setup = environment_setup)
1282
1283     locals().update( BaseProxy._make_stubs(
1284         server_class = ExperimentSuiteServer,
1285         template_class = nepi.core.execute.ExperimentSuite,
1286     ) )
1287     
1288     # Shutdown stops the serverside...
1289     def shutdown(self, _stub = shutdown):
1290         rv = _stub(self)
1291         self._client.send_stop()
1292         self._client.read_reply() # wait for it
1293         return rv
1294
1295 class TestbedControllerProxy(BaseProxy):
1296     
1297     _ServerClass = TestbedControllerServer
1298     
1299     def __init__(self, root_dir, log_level, 
1300             testbed_id = None, 
1301             testbed_version = None, 
1302             launch = True, 
1303             communication = DC.ACCESS_LOCAL,
1304             host = None, 
1305             port = None, 
1306             user = None, 
1307             ident_key = None, 
1308             agent = None,
1309             sudo = False, 
1310             environment_setup = "", 
1311             clean_root = False):
1312         if launch and (testbed_id == None or testbed_version == None):
1313             raise RuntimeError("To launch a TesbedControllerServer a "
1314                     "testbed_id and testbed_version are required")
1315         super(TestbedControllerProxy,self).__init__(
1316             ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1317                 environment_setup, clean_root),
1318             root_dir = root_dir,
1319             launch = launch,
1320             communication = communication,
1321             host = host, 
1322             port = port, 
1323             user = user,
1324             ident_key = ident_key, 
1325             agent = agent, 
1326             sudo = sudo, 
1327             environment_setup = environment_setup)
1328
1329     locals().update( BaseProxy._make_stubs(
1330         server_class = TestbedControllerServer,
1331         template_class = nepi.core.execute.TestbedController,
1332     ) )
1333     
1334     # Shutdown stops the serverside...
1335     def shutdown(self, _stub = shutdown):
1336         rv = _stub(self)
1337         self._client.send_stop()
1338         self._client.read_reply() # wait for it
1339         return rv
1340     
1341
1342 class ExperimentControllerProxy(BaseProxy):
1343     _ServerClass = ExperimentControllerServer
1344     
1345     def __init__(self, root_dir, log_level, 
1346             experiment_xml = None, 
1347             launch = True, 
1348             communication = DC.ACCESS_LOCAL,
1349             host = None, 
1350             port = None, 
1351             user = None, 
1352             ident_key = None, 
1353             agent = None, 
1354             sudo = False, 
1355             environment_setup = "",
1356             clean_root = False):
1357         super(ExperimentControllerProxy,self).__init__(
1358             ctor_args = (root_dir, log_level, experiment_xml, environment_setup, 
1359                 clean_root),
1360             root_dir = root_dir,
1361             launch = launch, 
1362             communication = communication,
1363             host = host, 
1364             port = port, 
1365             user = user,
1366             ident_key = ident_key, 
1367             agent = agent, 
1368             sudo = sudo, 
1369             environment_setup = environment_setup,
1370             clean_root = clean_root)
1371
1372     locals().update( BaseProxy._make_stubs(
1373         server_class = ExperimentControllerServer,
1374         template_class = nepi.core.execute.ExperimentController,
1375     ) )
1376
1377     # Shutdown stops the serverside...
1378     def shutdown(self, _stub = shutdown):
1379         rv = _stub(self)
1380         self._client.send_stop()
1381         self._client.read_reply() # wait for it
1382         return rv
1383