Added routes to OMF nodes
[nepi.git] / src / nepi / util / proxy.py
1 # -*- coding: utf-8 -*-
2
3 import base64
4 import nepi.core.execute
5 import nepi.util.environ
6 from nepi.core.attributes import AttributesMap, Attribute
7 from nepi.util import server, validation
8 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
9 import getpass
10 import cPickle
11 import sys
12 import time
13 import tempfile
14 import shutil
15 import functools
16 import os
17
18 # PROTOCOL REPLIES
19 OK = 0
20 ERROR = 1
21
22 # PROTOCOL INSTRUCTION MESSAGES
23 XML = 2 
24 TRACE   = 4
25 FINISHED    = 5
26 START   = 6
27 STOP    = 7
28 SHUTDOWN    = 8
29 CONFIGURE   = 9
30 CREATE      = 10
31 CREATE_SET  = 11
32 FACTORY_SET = 12
33 CONNECT     = 13
34 CROSS_CONNECT   = 14
35 ADD_TRACE   = 15
36 ADD_ADDRESS = 16
37 ADD_ROUTE   = 17
38 DO_SETUP    = 18
39 DO_CREATE   = 19
40 DO_CONNECT_INIT = 20
41 DO_CONFIGURE    = 21
42 DO_CROSS_CONNECT_INIT   = 22
43 GET = 23
44 SET = 24
45 ACTION  = 25
46 STATUS  = 26
47 GUIDS  = 27
48 GET_ROUTE = 28
49 GET_ADDRESS = 29
50 RECOVER = 30
51 DO_PRECONFIGURE     = 31
52 GET_ATTRIBUTE_LIST  = 32
53 DO_CONNECT_COMPL    = 33
54 DO_CROSS_CONNECT_COMPL  = 34
55 TESTBED_ID  = 35
56 TESTBED_VERSION  = 36
57 DO_PRESTART = 37
58 GET_FACTORY_ID = 38
59 GET_TESTBED_ID = 39
60 GET_TESTBED_VERSION = 40
61 TRACES_INFO = 41
62 EXEC_XML = 42
63 TESTBED_STATUS  = 43
64 STARTED_TIME  = 44
65 STOPPED_TIME  = 45
66 CURRENT = 46
67 ACCESS_CONFIGURATIONS = 47
68 CURRENT_ACCESS_CONFIG = 48
69
70
71 instruction_text = dict({
72     OK:     "OK",
73     ERROR:  "ERROR",
74     XML:    "XML",
75     EXEC_XML:    "EXEC_XML",
76     TRACE:  "TRACE",
77     FINISHED:   "FINISHED",
78     START:  "START",
79     STOP:   "STOP",
80     RECOVER: "RECOVER",
81     SHUTDOWN:   "SHUTDOWN",
82     CONFIGURE:  "CONFIGURE",
83     CREATE: "CREATE",
84     CREATE_SET: "CREATE_SET",
85     FACTORY_SET:    "FACTORY_SET",
86     CONNECT:    "CONNECT",
87     CROSS_CONNECT: "CROSS_CONNECT",
88     ADD_TRACE:  "ADD_TRACE",
89     ADD_ADDRESS:    "ADD_ADDRESS",
90     ADD_ROUTE:  "ADD_ROUTE",
91     DO_SETUP:   "DO_SETUP",
92     DO_CREATE:  "DO_CREATE",
93     DO_CONNECT_INIT: "DO_CONNECT_INIT",
94     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
95     DO_CONFIGURE:   "DO_CONFIGURE",
96     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
97     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
98     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
99     GET:    "GET",
100     SET:    "SET",
101     GET_ROUTE: "GET_ROUTE",
102     GET_ADDRESS: "GET_ADDRESS",
103     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
104     GET_FACTORY_ID: "GET_FACTORY_ID",
105     GET_TESTBED_ID: "GET_TESTBED_ID",
106     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
107     ACTION: "ACTION",
108     STATUS: "STATUS",
109     GUIDS:  "GUIDS",
110     TESTBED_ID: "TESTBED_ID",
111     TESTBED_VERSION: "TESTBED_VERSION",
112     TRACES_INFO: "TRACES_INFO",
113     STARTED_TIME: "STARTED_TIME",
114     STOPPED_TIME: "STOPPED_TIME",
115     CURRENT: "CURRENT",
116     ACCESS_CONFIGURATIONS: "ACCESS_CONFIGURATIONS",
117     CURRENT_ACCESS_CONFIG: "CURRENT_ACCESS_CONFIG"
118
119     })
120
121 def log_msg(server, params):
122     try:
123         instr = int(params[0])
124         instr_txt = instruction_text[instr]
125         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
126             instr_txt, ", ".join(map(str, params[1:]))))
127     except:
128         # don't die for logging
129         pass
130
131 def log_reply(server, reply):
132     try:
133         res = reply.split("|")
134         code = int(res[0])
135         code_txt = instruction_text[code]
136         try:
137             txt = base64.b64decode(res[1])
138         except:
139             txt = res[1]
140         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
141                 code_txt, txt))
142     except:
143         # don't die for logging
144         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
145                 reply))
146         pass
147
148 def to_server_log_level(log_level):
149     return (
150         DC.DEBUG_LEVEL
151             if log_level == DC.DEBUG_LEVEL 
152         else DC.ERROR_LEVEL
153     )
154
155 def get_access_config_params(access_config):
156     mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
157     launch = not access_config.get_attribute_value(DC.RECOVER)
158     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
159     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
160     log_level = to_server_log_level(log_level)
161     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
162     environment_setup = (
163         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
164         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
165         else ""
166     )
167     user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
168     host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
169     port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
170     agent = access_config.get_attribute_value(DC.USE_AGENT)
171     sudo = access_config.get_attribute_value(DC.USE_SUDO)
172     key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
173     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
174     clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
175     return (mode, launch, root_dir, log_level, communication, user, host, port,
176             key, agent, sudo, environment_setup, clean_root)
177
178 class AccessConfiguration(AttributesMap):
179     def __init__(self, params = None):
180         super(AccessConfiguration, self).__init__()
181         
182         from nepi.core.metadata import Metadata
183         
184         for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
185             self.add_attribute(**attr_info)
186
187         if params:
188             for attr_name, attr_value in params.iteritems():
189                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
190                 attr_value = parser(attr_value)
191                 self.set_attribute_value(attr_name, attr_value)
192
193 class TempDir(object):
194     def __init__(self):
195         self.path = tempfile.mkdtemp()
196     
197     def __del__(self):
198         shutil.rmtree(self.path)
199
200 class PermDir(object):
201     def __init__(self, path):
202         self.path = path
203
204 def create_experiment_suite(xml, access_config, repetitions = None,
205         duration = None, wait_guids = None):
206     mode = None
207     if access_config :
208         (mode, launch, root_dir, log_level, communication, user, host, port, 
209                 key, agent, sudo, environment_setup, clean_root) \
210                         = get_access_config_params(access_config)
211
212     if not mode or mode == DC.MODE_SINGLE_PROCESS:
213         from nepi.core.execute import ExperimentSuite
214         if not root_dir:
215             root_dir = TempDir()
216         else:
217             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
218
219         exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
220                 wait_guids)
221         
222         # inject reference to temporary dir, so that it gets cleaned
223         # up at destruction time.
224         exp_suite._tempdir = root_dir
225         return exp_suite
226     elif mode == DC.MODE_DAEMON:
227         return ExperimentSuiteProxy(root_dir, log_level,
228                 xml,
229                 repetitions = repetitions, 
230                 duration = duration,
231                 wait_guids = wait_guids, 
232                 communication = communication,
233                 host = host, 
234                 port = port, 
235                 user = user, 
236                 ident_key = key,
237                 agent = agent, 
238                 sudo = sudo, 
239                 environment_setup = environment_setup, 
240                 clean_root = clean_root)
241     raise RuntimeError("Unsupported access configuration '%s'" % mode)
242
243 def create_experiment_controller(xml, access_config = None):
244     mode = None
245     launch = True
246     log_level = DC.ERROR_LEVEL
247     if access_config:
248         (mode, launch, root_dir, log_level, communication, user, host, port, 
249                 key, agent, sudo, environment_setup, clean_root) \
250                         = get_access_config_params(access_config)
251
252     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
253
254     if not mode or mode == DC.MODE_SINGLE_PROCESS:
255         from nepi.core.execute import ExperimentController
256         
257         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
258             root_dir = TempDir()
259         else:
260             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
261         controller = ExperimentController(xml, root_dir.path)
262         
263         # inject reference to temporary dir, so that it gets cleaned
264         # up at destruction time.
265         controller._tempdir = root_dir
266         
267         if not launch:
268             # try to recover
269             controller.recover()
270         
271         return controller
272     elif mode == DC.MODE_DAEMON:
273         try:
274             return ExperimentControllerProxy(root_dir, log_level,
275                 experiment_xml = xml,
276                 communication = communication,
277                 host = host, 
278                 port = port, 
279                 user = user, 
280                 ident_key = key,
281                 agent = agent, 
282                 sudo = sudo, 
283                 launch = launch,
284                 environment_setup = environment_setup, 
285                 clean_root = clean_root)
286         except:
287             if not launch:
288                 # Maybe controller died, recover from persisted testbed information if possible
289                 controller = ExperimentControllerProxy(root_dir, log_level,
290                     experiment_xml = xml,
291                     communication = communication,
292                     host = host, 
293                     port = port, 
294                     user = user, 
295                     ident_key = key,
296                     agent = agent, 
297                     sudo = sudo, 
298                     launch = True,
299                     environment_setup = environment_setup,
300                     clean_root = clean_root)
301                 controller.recover()
302                 return controller
303             else:
304                 raise
305     raise RuntimeError("Unsupported access configuration '%s'" % mode)
306
307 def create_testbed_controller(testbed_id, testbed_version, access_config):
308     mode = None
309     launch = True
310     log_level = DC.ERROR_LEVEL
311     if access_config:
312         (mode, launch, root_dir, log_level, communication, user, host, port, 
313                 key, agent, sudo, environment_setup, clean_root) \
314                         = get_access_config_params(access_config)
315
316     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
317     
318     if not mode or mode == DC.MODE_SINGLE_PROCESS:
319         if not launch:
320             raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
321         return  _build_testbed_controller(testbed_id, testbed_version)
322     elif mode == DC.MODE_DAEMON:
323         return TestbedControllerProxy(root_dir, log_level, 
324                 testbed_id = testbed_id, 
325                 testbed_version = testbed_version,
326                 communication = communication,
327                 host = host, 
328                 port = port, 
329                 ident_key = key,
330                 user = user, 
331                 agent = agent, 
332                 sudo = sudo, 
333                 launch = launch,
334                 environment_setup = environment_setup, 
335                 clean_root = clean_root)
336     raise RuntimeError("Unsupported access configuration '%s'" % mode)
337
338 def _build_testbed_controller(testbed_id, testbed_version):
339     mod_name = nepi.util.environ.find_testbed(testbed_id)
340     
341     if not mod_name in sys.modules:
342         try:
343             __import__(mod_name)
344         except ImportError:
345             raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
346     
347     module = sys.modules[mod_name]
348     tc = module.TestbedController()
349     if tc.testbed_version != testbed_version:
350         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
351                 (testbed_id, testbed_version, tc.testbed_version))
352     return tc
353
354 # Just a namespace class
355 class Marshalling:
356     class Decoders:
357         @staticmethod
358         def pickled_data(sdata):
359             return cPickle.loads(base64.b64decode(sdata))
360         
361         @staticmethod
362         def base64_data(sdata):
363             return base64.b64decode(sdata)
364         
365         @staticmethod
366         def nullint(sdata):
367             return None if sdata == "None" else int(sdata)
368
369         @staticmethod
370         def bool(sdata):
371             return sdata == 'True'
372         
373     class Encoders:
374         @staticmethod
375         def pickled_data(data):
376             return base64.b64encode(cPickle.dumps(data))
377         
378         @staticmethod
379         def base64_data(data):
380             if not data:
381                 return ""
382             return base64.b64encode(data)
383         
384         @staticmethod
385         def nullint(data):
386             return "None" if data is None else int(data)
387         
388         @staticmethod
389         def bool(data):
390             return str(bool(data))
391            
392     # import into Marshalling all the decoders
393     # they act as types
394     locals().update([
395         (typname, typ)
396         for typname, typ in vars(Decoders).iteritems()
397         if not typname.startswith('_')
398     ])
399
400     _TYPE_ENCODERS = dict([
401         # id(type) -> (<encoding_function>, <formatting_string>)
402         (typname, (getattr(Encoders,typname),"%s"))
403         for typname in vars(Decoders)
404         if not typname.startswith('_')
405            and hasattr(Encoders,typname)
406     ])
407
408     # Builtins
409     _TYPE_ENCODERS["float"] = (float, "%r")
410     _TYPE_ENCODERS["int"] = (int, "%d")
411     _TYPE_ENCODERS["long"] = (int, "%d")
412     _TYPE_ENCODERS["str"] = (str, "%s")
413     _TYPE_ENCODERS["unicode"] = (str, "%s")
414     
415     # Generic encoder
416     _TYPE_ENCODERS[None] = (str, "%s")
417     
418     @staticmethod
419     def args(*types):
420         """
421         Decorator that converts the given function into one that takes
422         a single "params" list, with each parameter marshalled according
423         to the given factory callable (type constructors are accepted).
424         
425         The first argument (self) is left untouched.
426         
427         eg:
428         
429         @Marshalling.args(int,int,str,base64_data)
430         def somefunc(self, someint, otherint, somestr, someb64):
431            return someretval
432         """
433         def decor(f):
434             @functools.wraps(f)
435             def rv(self, params):
436                 return f(self, *[ ctor(val)
437                                   for ctor,val in zip(types, params[1:]) ])
438             
439             rv._argtypes = types
440             
441             # Derive type encoders by looking up types in _TYPE_ENCODERS
442             # make_proxy will use it to encode arguments in command strings
443             argencoders = []
444             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
445             for typ in types:
446                 if typ.__name__ in TYPE_ENCODERS:
447                     argencoders.append(TYPE_ENCODERS[typ.__name__])
448                 else:
449                     # generic encoder
450                     argencoders.append(TYPE_ENCODERS[None])
451             
452             rv._argencoders = tuple(argencoders)
453             
454             rv._retval = getattr(f, '_retval', None)
455             return rv
456         return decor
457
458     @staticmethod
459     def retval(typ=Decoders.base64_data):
460         """
461         Decorator that converts the given function into one that 
462         returns a properly encoded return string, given that the undecorated
463         function returns suitable input for the encoding function.
464         
465         The optional typ argument specifies a type.
466         For the default of base64_data, return values should be strings.
467         The return value of the encoding method should be a string always.
468         
469         eg:
470         
471         @Marshalling.args(int,int,str,base64_data)
472         @Marshalling.retval(str)
473         def somefunc(self, someint, otherint, somestr, someb64):
474            return someint
475         """
476         encode, fmt = Marshalling._TYPE_ENCODERS.get(
477             typ.__name__,
478             Marshalling._TYPE_ENCODERS[None])
479         fmt = "%d|"+fmt
480         
481         def decor(f):
482             @functools.wraps(f)
483             def rv(self, *p, **kw):
484                 data = f(self, *p, **kw)
485                 return fmt % (
486                     OK,
487                     encode(data)
488                 )
489             rv._retval = typ
490             rv._argtypes = getattr(f, '_argtypes', None)
491             rv._argencoders = getattr(f, '_argencoders', None)
492             return rv
493         return decor
494     
495     @staticmethod
496     def retvoid(f):
497         """
498         Decorator that converts the given function into one that 
499         always return an encoded empty string.
500         
501         Useful for null-returning functions.
502         """
503         OKRV = "%d|" % (OK,)
504         
505         @functools.wraps(f)
506         def rv(self, *p, **kw):
507             f(self, *p, **kw)
508             return OKRV
509         
510         rv._retval = None
511         rv._argtypes = getattr(f, '_argtypes', None)
512         rv._argencoders = getattr(f, '_argencoders', None)
513         return rv
514     
515     @staticmethod
516     def handles(whichcommand):
517         """
518         Associates the method with a given command code for servers.
519         It should always be the topmost decorator.
520         """
521         def decor(f):
522             f._handles_command = whichcommand
523             return f
524         return decor
525
526 class BaseServer(server.Server):
527     def reply_action(self, msg):
528         if not msg:
529             result = base64.b64encode("Invalid command line")
530             reply = "%d|%s" % (ERROR, result)
531         else:
532             params = msg.split("|")
533             instruction = int(params[0])
534             log_msg(self, params)
535             try:
536                 for mname,meth in vars(self.__class__).iteritems():
537                     if not mname.startswith('_'):
538                         cmd = getattr(meth, '_handles_command', None)
539                         if cmd == instruction:
540                             meth = getattr(self, mname)
541                             reply = meth(params)
542                             break
543                 else:
544                     error = "Invalid instruction %s" % instruction
545                     self.log_error(error)
546                     result = base64.b64encode(error)
547                     reply = "%d|%s" % (ERROR, result)
548             except:
549                 error = self.log_error()
550                 result = base64.b64encode(error)
551                 reply = "%d|%s" % (ERROR, result)
552         log_reply(self, reply)
553         return reply
554
555 class ExperimentSuiteServer(BaseServer):
556     def __init__(self, root_dir, log_level, 
557             xml, repetitions, duration, wait_guids, 
558             communication = DC.ACCESS_LOCAL,
559             host = None, 
560             port = None, 
561             user = None, 
562             ident_key = None, 
563             agent = None,
564             sudo = False, 
565             environment_setup = "", 
566             clean_root = False):
567         super(ExperimentSuiteServer, self).__init__(root_dir, log_level, 
568             environment_setup = environment_setup, clean_root = clean_root)
569         access_config = AccessConfiguration()
570         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
571         access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
572         access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
573         if user:
574             access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
575         if host:
576             access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
577         if port:
578             access_config.set_attribute_value(DC.DEPLOYMENT_PORT, port)
579         if agent:    
580             access_config.set_attribute_value(DC.USE_AGENT, agent)
581         if sudo:
582             acess_config.set_attribute_value(DC.USE_SUDO, sudo)
583         if ident_key:
584             access_config.set_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
585         if communication:
586             access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
587         if clean_root:
588             access_config.set_attribute_value(DC.CLEAN_ROOT, clean_root)
589         self._experiment_xml = xml
590         self._duration = duration
591         self._repetitions = repetitions
592         self._wait_guids = wait_guids
593         self._access_config = access_config
594         self._experiment_suite = None
595
596     def post_daemonize(self):
597         from nepi.core.execute import ExperimentSuite
598         self._experiment_suite = ExperimentSuite(
599                 self._experiment_xml, self._access_config, 
600                 self._repetitions, self._duration, self._wait_guids)
601
602     @Marshalling.handles(CURRENT)
603     @Marshalling.args()
604     @Marshalling.retval(int)
605     def current(self):
606         return self._experiment_suite.current()
607    
608     @Marshalling.handles(STATUS)
609     @Marshalling.args()
610     @Marshalling.retval(int)
611     def status(self):
612         return self._experiment_suite.status()
613     
614     @Marshalling.handles(FINISHED)
615     @Marshalling.args()
616     @Marshalling.retval(Marshalling.bool)
617     def is_finished(self):
618         return self._experiment_suite.is_finished()
619
620     @Marshalling.handles(ACCESS_CONFIGURATIONS)
621     @Marshalling.args()
622     @Marshalling.retval( Marshalling.pickled_data )
623     def get_access_configurations(self):
624         return self._experiment_suite.get_access_configurations()
625
626     @Marshalling.handles(START)
627     @Marshalling.args()
628     @Marshalling.retvoid
629     def start(self):
630         self._experiment_suite.start()
631
632     @Marshalling.handles(SHUTDOWN)
633     @Marshalling.args()
634     @Marshalling.retvoid
635     def shutdown(self):
636         self._experiment_suite.shutdown()
637
638     @Marshalling.handles(CURRENT_ACCESS_CONFIG)
639     @Marshalling.args()
640     @Marshalling.retval( Marshalling.pickled_data )
641     def get_current_access_config(self):
642         return self._experiment_suite.get_current_access_config()
643
644 class TestbedControllerServer(BaseServer):
645     def __init__(self, root_dir, log_level, testbed_id, testbed_version, 
646             environment_setup, clean_root):
647         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
648             environment_setup = environment_setup, clean_root = clean_root)
649         self._testbed_id = testbed_id
650         self._testbed_version = testbed_version
651         self._testbed = None
652
653     def post_daemonize(self):
654         self._testbed = _build_testbed_controller(self._testbed_id, 
655                 self._testbed_version)
656
657     @Marshalling.handles(GUIDS)
658     @Marshalling.args()
659     @Marshalling.retval( Marshalling.pickled_data )
660     def guids(self):
661         return self._testbed.guids
662
663     @Marshalling.handles(TESTBED_ID)
664     @Marshalling.args()
665     @Marshalling.retval()
666     def testbed_id(self):
667         return str(self._testbed.testbed_id)
668
669     @Marshalling.handles(TESTBED_VERSION)
670     @Marshalling.args()
671     @Marshalling.retval()
672     def testbed_version(self):
673         return str(self._testbed.testbed_version)
674
675     @Marshalling.handles(CREATE)
676     @Marshalling.args(int, str)
677     @Marshalling.retvoid
678     def defer_create(self, guid, factory_id):
679         self._testbed.defer_create(guid, factory_id)
680
681     @Marshalling.handles(TRACE)
682     @Marshalling.args(int, str, Marshalling.base64_data)
683     @Marshalling.retval()
684     def trace(self, guid, trace_id, attribute):
685         return self._testbed.trace(guid, trace_id, attribute)
686
687     @Marshalling.handles(TRACES_INFO)
688     @Marshalling.args()
689     @Marshalling.retval( Marshalling.pickled_data )
690     def traces_info(self):
691         return self._testbed.traces_info()
692
693     @Marshalling.handles(START)
694     @Marshalling.args()
695     @Marshalling.retvoid
696     def start(self):
697         self._testbed.start()
698
699     @Marshalling.handles(STOP)
700     @Marshalling.args()
701     @Marshalling.retvoid
702     def stop(self):
703         self._testbed.stop()
704
705     @Marshalling.handles(SHUTDOWN)
706     @Marshalling.args()
707     @Marshalling.retvoid
708     def shutdown(self):
709         self._testbed.shutdown()
710
711     @Marshalling.handles(CONFIGURE)
712     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
713     @Marshalling.retvoid
714     def defer_configure(self, name, value):
715         self._testbed.defer_configure(name, value)
716
717     @Marshalling.handles(CREATE_SET)
718     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
719     @Marshalling.retvoid
720     def defer_create_set(self, guid, name, value):
721         self._testbed.defer_create_set(guid, name, value)
722
723     @Marshalling.handles(FACTORY_SET)
724     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
725     @Marshalling.retvoid
726     def defer_factory_set(self, name, value):
727         self._testbed.defer_factory_set(name, value)
728
729     @Marshalling.handles(CONNECT)
730     @Marshalling.args(int, str, int, str)
731     @Marshalling.retvoid
732     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
733         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
734             connector_type_name2)
735
736     @Marshalling.handles(CROSS_CONNECT)
737     @Marshalling.args(int, str, int, int, str, str, str)
738     @Marshalling.retvoid
739     def defer_cross_connect(self, 
740             guid, connector_type_name,
741             cross_guid, cross_testbed_guid,
742             cross_testbed_id, cross_factory_id,
743             cross_connector_type_name):
744         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
745             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
746             cross_connector_type_name)
747
748     @Marshalling.handles(ADD_TRACE)
749     @Marshalling.args(int, str)
750     @Marshalling.retvoid
751     def defer_add_trace(self, guid, trace_id):
752         self._testbed.defer_add_trace(guid, trace_id)
753
754     @Marshalling.handles(ADD_ADDRESS)
755     @Marshalling.args(int, str, int, Marshalling.pickled_data)
756     @Marshalling.retvoid
757     def defer_add_address(self, guid, address, netprefix, broadcast):
758         self._testbed.defer_add_address(guid, address, netprefix,
759                 broadcast)
760
761     @Marshalling.handles(ADD_ROUTE)
762     @Marshalling.args(int, str, int, str, int)
763     @Marshalling.retvoid
764     def defer_add_route(self, guid, destination, netprefix, nexthop, 
765             metric, device):
766         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, 
767                 metric, device)
768
769     @Marshalling.handles(DO_SETUP)
770     @Marshalling.args()
771     @Marshalling.retvoid
772     def do_setup(self):
773         self._testbed.do_setup()
774
775     @Marshalling.handles(DO_CREATE)
776     @Marshalling.args()
777     @Marshalling.retvoid
778     def do_create(self):
779         self._testbed.do_create()
780
781     @Marshalling.handles(DO_CONNECT_INIT)
782     @Marshalling.args()
783     @Marshalling.retvoid
784     def do_connect_init(self):
785         self._testbed.do_connect_init()
786
787     @Marshalling.handles(DO_CONNECT_COMPL)
788     @Marshalling.args()
789     @Marshalling.retvoid
790     def do_connect_compl(self):
791         self._testbed.do_connect_compl()
792
793     @Marshalling.handles(DO_CONFIGURE)
794     @Marshalling.args()
795     @Marshalling.retvoid
796     def do_configure(self):
797         self._testbed.do_configure()
798
799     @Marshalling.handles(DO_PRECONFIGURE)
800     @Marshalling.args()
801     @Marshalling.retvoid
802     def do_preconfigure(self):
803         self._testbed.do_preconfigure()
804
805     @Marshalling.handles(DO_PRESTART)
806     @Marshalling.args()
807     @Marshalling.retvoid
808     def do_prestart(self):
809         self._testbed.do_prestart()
810
811     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
812     @Marshalling.args( Marshalling.Decoders.pickled_data )
813     @Marshalling.retvoid
814     def do_cross_connect_init(self, cross_data):
815         self._testbed.do_cross_connect_init(cross_data)
816
817     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
818     @Marshalling.args( Marshalling.Decoders.pickled_data )
819     @Marshalling.retvoid
820     def do_cross_connect_compl(self, cross_data):
821         self._testbed.do_cross_connect_compl(cross_data)
822
823     @Marshalling.handles(GET)
824     @Marshalling.args(int, Marshalling.base64_data, str)
825     @Marshalling.retval( Marshalling.pickled_data )
826     def get(self, guid, name, time):
827         return self._testbed.get(guid, name, time)
828
829     @Marshalling.handles(SET)
830     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
831     @Marshalling.retvoid
832     def set(self, guid, name, value, time):
833         self._testbed.set(guid, name, value, time)
834
835     @Marshalling.handles(GET_ADDRESS)
836     @Marshalling.args(int, int, Marshalling.base64_data)
837     @Marshalling.retval()
838     def get_address(self, guid, index, attribute):
839         return str(self._testbed.get_address(guid, index, attribute))
840
841     @Marshalling.handles(GET_ROUTE)
842     @Marshalling.args(int, int, Marshalling.base64_data)
843     @Marshalling.retval()
844     def get_route(self, guid, index, attribute):
845         return str(self._testbed.get_route(guid, index, attribute))
846
847     @Marshalling.handles(ACTION)
848     @Marshalling.args(str, int, Marshalling.base64_data)
849     @Marshalling.retvoid
850     def action(self, time, guid, command):
851         self._testbed.action(time, guid, command)
852
853     @Marshalling.handles(STATUS)
854     @Marshalling.args(Marshalling.nullint)
855     @Marshalling.retval(int)
856     def status(self, guid):
857         return self._testbed.status(guid)
858
859     @Marshalling.handles(TESTBED_STATUS)
860     @Marshalling.args()
861     @Marshalling.retval(int)
862     def testbed_status(self):
863         return self._testbed.testbed_status()
864
865     @Marshalling.handles(GET_ATTRIBUTE_LIST)
866     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
867     @Marshalling.retval( Marshalling.pickled_data )
868     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
869         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
870
871     @Marshalling.handles(GET_FACTORY_ID)
872     @Marshalling.args(int)
873     @Marshalling.retval()
874     def get_factory_id(self, guid):
875         return self._testbed.get_factory_id(guid)
876
877     @Marshalling.handles(RECOVER)
878     @Marshalling.args()
879     @Marshalling.retvoid
880     def recover(self):
881         self._testbed.recover()
882
883
884 class ExperimentControllerServer(BaseServer):
885     def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
886             clean_root):
887         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
888             environment_setup = environment_setup, clean_root = clean_root)
889         self._experiment_xml = experiment_xml
890         self._experiment = None
891
892     def post_daemonize(self):
893         from nepi.core.execute import ExperimentController
894         self._experiment = ExperimentController(self._experiment_xml, 
895             root_dir = self._root_dir)
896
897     @Marshalling.handles(GUIDS)
898     @Marshalling.args()
899     @Marshalling.retval( Marshalling.pickled_data )
900     def guids(self):
901         return self._experiment.guids
902
903     @Marshalling.handles(STARTED_TIME)
904     @Marshalling.args()
905     @Marshalling.retval( Marshalling.pickled_data )
906     def started_time(self):
907         return self._experiment.started_time
908
909     @Marshalling.handles(STOPPED_TIME)
910     @Marshalling.args()
911     @Marshalling.retval( Marshalling.pickled_data )
912     def stopped_time(self):
913         return self._experiment.stopped_time
914
915     @Marshalling.handles(XML)
916     @Marshalling.args()
917     @Marshalling.retval()
918     def experiment_design_xml(self):
919         return self._experiment.experiment_design_xml
920         
921     @Marshalling.handles(EXEC_XML)
922     @Marshalling.args()
923     @Marshalling.retval()
924     def experiment_execute_xml(self):
925         return self._experiment.experiment_execute_xml
926         
927     @Marshalling.handles(TRACE)
928     @Marshalling.args(int, str, Marshalling.base64_data)
929     @Marshalling.retval()
930     def trace(self, guid, trace_id, attribute):
931         return str(self._experiment.trace(guid, trace_id, attribute))
932
933     @Marshalling.handles(TRACES_INFO)
934     @Marshalling.args()
935     @Marshalling.retval( Marshalling.pickled_data )
936     def traces_info(self):
937         return self._experiment.traces_info()
938
939     @Marshalling.handles(FINISHED)
940     @Marshalling.args(int)
941     @Marshalling.retval(Marshalling.bool)
942     def is_finished(self, guid):
943         return self._experiment.is_finished(guid)
944
945     @Marshalling.handles(STATUS)
946     @Marshalling.args(int)
947     @Marshalling.retval(int)
948     def status(self, guid):
949         return self._experiment.status(guid)
950
951     @Marshalling.handles(GET)
952     @Marshalling.args(int, Marshalling.base64_data, str)
953     @Marshalling.retval( Marshalling.pickled_data )
954     def get(self, guid, name, time):
955         return self._experiment.get(guid, name, time)
956
957     @Marshalling.handles(SET)
958     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
959     @Marshalling.retvoid
960     def set(self, guid, name, value, time):
961         self._experiment.set(guid, name, value, time)
962
963     @Marshalling.handles(START)
964     @Marshalling.args()
965     @Marshalling.retvoid
966     def start(self):
967         self._experiment.start()
968
969     @Marshalling.handles(STOP)
970     @Marshalling.args()
971     @Marshalling.retvoid
972     def stop(self):
973         self._experiment.stop()
974
975     @Marshalling.handles(RECOVER)
976     @Marshalling.args()
977     @Marshalling.retvoid
978     def recover(self):
979         self._experiment.recover()
980
981     @Marshalling.handles(SHUTDOWN)
982     @Marshalling.args()
983     @Marshalling.retvoid
984     def shutdown(self):
985         self._experiment.shutdown()
986
987     @Marshalling.handles(GET_TESTBED_ID)
988     @Marshalling.args(int)
989     @Marshalling.retval()
990     def get_testbed_id(self, guid):
991         return self._experiment.get_testbed_id(guid)
992
993     @Marshalling.handles(GET_FACTORY_ID)
994     @Marshalling.args(int)
995     @Marshalling.retval()
996     def get_factory_id(self, guid):
997         return self._experiment.get_factory_id(guid)
998
999     @Marshalling.handles(GET_TESTBED_VERSION)
1000     @Marshalling.args(int)
1001     @Marshalling.retval()
1002     def get_testbed_version(self, guid):
1003         return self._experiment.get_testbed_version(guid)
1004
1005 class BaseProxy(object):
1006     _ServerClass = None
1007     _ServerClassModule = "nepi.util.proxy"
1008     
1009     def __init__(self, ctor_args, root_dir, 
1010             launch = True, 
1011             communication = DC.ACCESS_LOCAL,
1012             host = None, 
1013             port = None, 
1014             user = None, 
1015             ident_key = None, 
1016             agent = None,
1017             sudo = False, 
1018             environment_setup = "",
1019             clean_root = False):
1020         if launch:
1021             python_code = (
1022                     "from %(classmodule)s import %(classname)s;"
1023                     "s = %(classname)s%(ctor_args)r;"
1024                     "s.run()" 
1025                 % dict(
1026                     classname = self._ServerClass.__name__,
1027                     classmodule = self._ServerClassModule,
1028                     ctor_args = ctor_args
1029                 ) )
1030             proc = server.popen_python(python_code,
1031                         communication = communication,
1032                         host = host,
1033                         port = port, 
1034                         user = user, 
1035                         agent = agent,
1036                         ident_key = ident_key, 
1037                         sudo = sudo,
1038                         environment_setup = environment_setup) 
1039             # Wait for the server to be ready, otherwise nobody
1040             # will be able to connect to it
1041             err = []
1042             helo = "nope"
1043             while helo:
1044                 helo = proc.stderr.readline()
1045                 if helo == 'SERVER_READY.\n':
1046                     break
1047                 err.append(helo)
1048             else:
1049                 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
1050         # connect client to server
1051         self._client = server.Client(root_dir, 
1052                 communication = communication,
1053                 host = host, 
1054                 port = port, 
1055                 user = user, 
1056                 agent = agent, 
1057                 sudo = sudo,
1058                 environment_setup = environment_setup)
1059     
1060     @staticmethod
1061     def _make_message(argtypes, argencoders, command, methname, classname, *args):
1062         if len(argtypes) != len(argencoders):
1063             raise ValueError, "Invalid arguments for _make_message: "\
1064                 "in stub method %s of class %s "\
1065                 "argtypes and argencoders must match in size" % (
1066                     methname, classname )
1067         if len(argtypes) != len(args):
1068             raise ValueError, "Invalid arguments for _make_message: "\
1069                 "in stub method %s of class %s "\
1070                 "expected %d arguments, got %d" % (
1071                     methname, classname,
1072                     len(argtypes), len(args))
1073         
1074         buf = []
1075         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
1076             try:
1077                 buf.append(fmt % encode(val))
1078             except:
1079                 import traceback
1080                 raise TypeError, "Argument %d of stub method %s of class %s "\
1081                     "requires a value of type %s, but got %s - nested error: %s" % (
1082                         argnum, methname, classname,
1083                         getattr(typ, '__name__', typ), type(val),
1084                         traceback.format_exc()
1085                 )
1086         
1087         return "%d|%s" % (command, '|'.join(buf))
1088     
1089     @staticmethod
1090     def _parse_reply(rvtype, methname, classname, reply):
1091         if not reply:
1092             raise RuntimeError, "Invalid reply: %r "\
1093                 "for stub method %s of class %s" % (
1094                     reply,
1095                     methname,
1096                     classname)
1097         
1098         try:
1099             result = reply.split("|")
1100             code = int(result[0])
1101             text = result[1]
1102         except:
1103             import traceback
1104             raise TypeError, "Return value of stub method %s of class %s "\
1105                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
1106                     methname, classname,
1107                     getattr(rvtype, '__name__', rvtype), reply,
1108                     traceback.format_exc()
1109             )
1110         if code == ERROR:
1111             text = base64.b64decode(text)
1112             raise RuntimeError(text)
1113         elif code == OK:
1114             try:
1115                 if rvtype is None:
1116                     return
1117                 else:
1118                     return rvtype(text)
1119             except:
1120                 import traceback
1121                 raise TypeError, "Return value of stub method %s of class %s "\
1122                     "cannot be parsed: must be of type %s - nested error: %s" % (
1123                         methname, classname,
1124                         getattr(rvtype, '__name__', rvtype),
1125                         traceback.format_exc()
1126                 )
1127         else:
1128             raise RuntimeError, "Invalid reply: %r "\
1129                 "for stub method %s of class %s - unknown code" % (
1130                     reply,
1131                     methname,
1132                     classname)
1133     
1134     @staticmethod
1135     def _make_stubs(server_class, template_class):
1136         """
1137         Returns a dictionary method_name -> method
1138         with stub methods.
1139         
1140         Usage:
1141         
1142             class SomeProxy(BaseProxy):
1143                ...
1144                
1145                locals().update( BaseProxy._make_stubs(
1146                     ServerClass,
1147                     TemplateClass
1148                ) )
1149         
1150         ServerClass is the corresponding Server class, as
1151         specified in the _ServerClass class method (_make_stubs
1152         is static and can't access the method), and TemplateClass
1153         is the ultimate implementation class behind the server,
1154         from which argument names and defaults are taken, to
1155         maintain meaningful interfaces.
1156         """
1157         rv = {}
1158         
1159         class NONE: pass
1160         
1161         import os.path
1162         func_template_path = os.path.join(
1163             os.path.dirname(__file__),
1164             'proxy_stub.tpl')
1165         func_template_file = open(func_template_path, "r")
1166         func_template = func_template_file.read()
1167         func_template_file.close()
1168         
1169         for methname in vars(template_class).copy():
1170             if methname.endswith('_deferred'):
1171                 # cannot wrap deferreds...
1172                 continue
1173             dmethname = methname+'_deferred'
1174             if hasattr(server_class, methname) and not methname.startswith('_'):
1175                 template_meth = getattr(template_class, methname)
1176                 server_meth = getattr(server_class, methname)
1177                 
1178                 command = getattr(server_meth, '_handles_command', None)
1179                 argtypes = getattr(server_meth, '_argtypes', None)
1180                 argencoders = getattr(server_meth, '_argencoders', None)
1181                 rvtype = getattr(server_meth, '_retval', None)
1182                 doprop = False
1183                 
1184                 if hasattr(template_meth, 'fget'):
1185                     # property getter
1186                     template_meth = template_meth.fget
1187                     doprop = True
1188                 
1189                 if command is not None and argtypes is not None and argencoders is not None:
1190                     # We have an interface method...
1191                     code = template_meth.func_code
1192                     argnames = code.co_varnames[:code.co_argcount]
1193                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1194                                   + (template_meth.func_defaults or ()) )
1195                     
1196                     func_globals = dict(
1197                         BaseProxy = BaseProxy,
1198                         argtypes = argtypes,
1199                         argencoders = argencoders,
1200                         rvtype = rvtype,
1201                         functools = functools,
1202                     )
1203                     context = dict()
1204                     
1205                     func_text = func_template % dict(
1206                         self = argnames[0],
1207                         args = '%s' % (','.join(argnames[1:])),
1208                         argdefs = ','.join([
1209                             argname if argdef is NONE
1210                             else "%s=%r" % (argname, argdef)
1211                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
1212                         ]),
1213                         command = command,
1214                         methname = methname,
1215                         classname = server_class.__name__
1216                     )
1217                     
1218                     func_text = compile(
1219                         func_text,
1220                         func_template_path,
1221                         'exec')
1222                     
1223                     exec func_text in func_globals, context
1224                     
1225                     if doprop:
1226                         rv[methname] = property(context[methname])
1227                         rv[dmethname] = property(context[dmethname])
1228                     else:
1229                         rv[methname] = context[methname]
1230                         rv[dmethname] = context[dmethname]
1231                     
1232                     # inject _deferred into core classes
1233                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1234                         def freezename(methname, dmethname):
1235                             def dmeth(self, *p, **kw): 
1236                                 return getattr(self, methname)(*p, **kw)
1237                             dmeth.__name__ = dmethname
1238                             return dmeth
1239                         dmeth = freezename(methname, dmethname)
1240                         setattr(template_class, dmethname, dmeth)
1241         
1242         return rv
1243
1244 class ExperimentSuiteProxy(BaseProxy):
1245     
1246     _ServerClass = ExperimentSuiteServer
1247     
1248     def __init__(self, root_dir, log_level,
1249             xml, repetitions, duration, wait_guids, 
1250             communication = DC.ACCESS_LOCAL,
1251             host = None, 
1252             port = None, 
1253             user = None, 
1254             ident_key = None, 
1255             agent = None,
1256             sudo = False, 
1257             environment_setup = "", 
1258             clean_root = False):
1259         super(ExperimentSuiteProxy,self).__init__(
1260             ctor_args = (root_dir, log_level,
1261                 xml, 
1262                 repetitions, 
1263                 duration,
1264                 wait_guids, 
1265                 communication,
1266                 host, 
1267                 port, 
1268                 user, 
1269                 ident_key,
1270                 agent, 
1271                 sudo, 
1272                 environment_setup, 
1273                 clean_root),
1274             root_dir = root_dir,
1275             launch = True, #launch
1276             communication = communication,
1277             host = host, 
1278             port = port, 
1279             user = user,
1280             ident_key = ident_key, 
1281             agent = agent, 
1282             sudo = sudo, 
1283             environment_setup = environment_setup)
1284
1285     locals().update( BaseProxy._make_stubs(
1286         server_class = ExperimentSuiteServer,
1287         template_class = nepi.core.execute.ExperimentSuite,
1288     ) )
1289     
1290     # Shutdown stops the serverside...
1291     def shutdown(self, _stub = shutdown):
1292         rv = _stub(self)
1293         self._client.send_stop()
1294         self._client.read_reply() # wait for it
1295         return rv
1296
1297 class TestbedControllerProxy(BaseProxy):
1298     
1299     _ServerClass = TestbedControllerServer
1300     
1301     def __init__(self, root_dir, log_level, 
1302             testbed_id = None, 
1303             testbed_version = None, 
1304             launch = True, 
1305             communication = DC.ACCESS_LOCAL,
1306             host = None, 
1307             port = None, 
1308             user = None, 
1309             ident_key = None, 
1310             agent = None,
1311             sudo = False, 
1312             environment_setup = "", 
1313             clean_root = False):
1314         if launch and (testbed_id == None or testbed_version == None):
1315             raise RuntimeError("To launch a TesbedControllerServer a "
1316                     "testbed_id and testbed_version are required")
1317         super(TestbedControllerProxy,self).__init__(
1318             ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1319                 environment_setup, clean_root),
1320             root_dir = root_dir,
1321             launch = launch,
1322             communication = communication,
1323             host = host, 
1324             port = port, 
1325             user = user,
1326             ident_key = ident_key, 
1327             agent = agent, 
1328             sudo = sudo, 
1329             environment_setup = environment_setup)
1330
1331     locals().update( BaseProxy._make_stubs(
1332         server_class = TestbedControllerServer,
1333         template_class = nepi.core.execute.TestbedController,
1334     ) )
1335     
1336     # Shutdown stops the serverside...
1337     def shutdown(self, _stub = shutdown):
1338         rv = _stub(self)
1339         self._client.send_stop()
1340         self._client.read_reply() # wait for it
1341         return rv
1342     
1343
1344 class ExperimentControllerProxy(BaseProxy):
1345     _ServerClass = ExperimentControllerServer
1346     
1347     def __init__(self, root_dir, log_level, 
1348             experiment_xml = None, 
1349             launch = True, 
1350             communication = DC.ACCESS_LOCAL,
1351             host = None, 
1352             port = None, 
1353             user = None, 
1354             ident_key = None, 
1355             agent = None, 
1356             sudo = False, 
1357             environment_setup = "",
1358             clean_root = False):
1359         super(ExperimentControllerProxy,self).__init__(
1360             ctor_args = (root_dir, log_level, experiment_xml, environment_setup, 
1361                 clean_root),
1362             root_dir = root_dir,
1363             launch = launch, 
1364             communication = communication,
1365             host = host, 
1366             port = port, 
1367             user = user,
1368             ident_key = ident_key, 
1369             agent = agent, 
1370             sudo = sudo, 
1371             environment_setup = environment_setup,
1372             clean_root = clean_root)
1373
1374     locals().update( BaseProxy._make_stubs(
1375         server_class = ExperimentControllerServer,
1376         template_class = nepi.core.execute.ExperimentController,
1377     ) )
1378
1379     # Shutdown stops the serverside...
1380     def shutdown(self, _stub = shutdown):
1381         rv = _stub(self)
1382         self._client.send_stop()
1383         self._client.read_reply() # wait for it
1384         return rv
1385