282fd92bfeb5d026305ea84dd6ecf6927c2af1b4
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 import nepi.core.execute
6 import nepi.util.environ
7 from nepi.core.attributes import AttributesMap, Attribute
8 from nepi.util import server, validation
9 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
10 import getpass
11 import cPickle
12 import sys
13 import time
14 import tempfile
15 import shutil
16 import functools
17 import os
18
19 # PROTOCOL REPLIES
20 OK = 0
21 ERROR = 1
22
23 # PROTOCOL INSTRUCTION MESSAGES
24 XML = 2 
25 TRACE   = 4
26 FINISHED    = 5
27 START   = 6
28 STOP    = 7
29 SHUTDOWN    = 8
30 CONFIGURE   = 9
31 CREATE      = 10
32 CREATE_SET  = 11
33 FACTORY_SET = 12
34 CONNECT     = 13
35 CROSS_CONNECT   = 14
36 ADD_TRACE   = 15
37 ADD_ADDRESS = 16
38 ADD_ROUTE   = 17
39 DO_SETUP    = 18
40 DO_CREATE   = 19
41 DO_CONNECT_INIT = 20
42 DO_CONFIGURE    = 21
43 DO_CROSS_CONNECT_INIT   = 22
44 GET = 23
45 SET = 24
46 ACTION  = 25
47 STATUS  = 26
48 GUIDS  = 27
49 GET_ROUTE = 28
50 GET_ADDRESS = 29
51 RECOVER = 30
52 DO_PRECONFIGURE     = 31
53 GET_ATTRIBUTE_LIST  = 32
54 DO_CONNECT_COMPL    = 33
55 DO_CROSS_CONNECT_COMPL  = 34
56 TESTBED_ID  = 35
57 TESTBED_VERSION  = 36
58 DO_PRESTART = 37
59 GET_FACTORY_ID = 38
60 GET_TESTBED_ID = 39
61 GET_TESTBED_VERSION = 40
62 TRACES_INFO = 41
63 EXEC_XML = 42
64 TESTBED_STATUS  = 43
65 STARTED_TIME  = 44
66 STOPPED_TIME  = 45
67 CURRENT = 46
68 ACCESS_CONFIGURATIONS = 47
69 CURRENT_ACCESS_CONFIG = 48
70
71
72 instruction_text = dict({
73     OK:     "OK",
74     ERROR:  "ERROR",
75     XML:    "XML",
76     EXEC_XML:    "EXEC_XML",
77     TRACE:  "TRACE",
78     FINISHED:   "FINISHED",
79     START:  "START",
80     STOP:   "STOP",
81     RECOVER: "RECOVER",
82     SHUTDOWN:   "SHUTDOWN",
83     CONFIGURE:  "CONFIGURE",
84     CREATE: "CREATE",
85     CREATE_SET: "CREATE_SET",
86     FACTORY_SET:    "FACTORY_SET",
87     CONNECT:    "CONNECT",
88     CROSS_CONNECT: "CROSS_CONNECT",
89     ADD_TRACE:  "ADD_TRACE",
90     ADD_ADDRESS:    "ADD_ADDRESS",
91     ADD_ROUTE:  "ADD_ROUTE",
92     DO_SETUP:   "DO_SETUP",
93     DO_CREATE:  "DO_CREATE",
94     DO_CONNECT_INIT: "DO_CONNECT_INIT",
95     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
96     DO_CONFIGURE:   "DO_CONFIGURE",
97     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
98     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
99     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
100     GET:    "GET",
101     SET:    "SET",
102     GET_ROUTE: "GET_ROUTE",
103     GET_ADDRESS: "GET_ADDRESS",
104     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
105     GET_FACTORY_ID: "GET_FACTORY_ID",
106     GET_TESTBED_ID: "GET_TESTBED_ID",
107     GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
108     ACTION: "ACTION",
109     STATUS: "STATUS",
110     GUIDS:  "GUIDS",
111     TESTBED_ID: "TESTBED_ID",
112     TESTBED_VERSION: "TESTBED_VERSION",
113     TRACES_INFO: "TRACES_INFO",
114     STARTED_TIME: "STARTED_TIME",
115     STOPPED_TIME: "STOPPED_TIME",
116     CURRENT: "CURRENT",
117     ACCESS_CONFIGURATIONS: "ACCESS_CONFIGURATIONS",
118     CURRENT_ACCESS_CONFIG: "CURRENT_ACCESS_CONFIG"
119
120     })
121
122 def log_msg(server, params):
123     try:
124         instr = int(params[0])
125         instr_txt = instruction_text[instr]
126         server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
127             instr_txt, ", ".join(map(str, params[1:]))))
128     except:
129         # don't die for logging
130         pass
131
132 def log_reply(server, reply):
133     try:
134         res = reply.split("|")
135         code = int(res[0])
136         code_txt = instruction_text[code]
137         try:
138             txt = base64.b64decode(res[1])
139         except:
140             txt = res[1]
141         server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
142                 code_txt, txt))
143     except:
144         # don't die for logging
145         server.log_debug("%s - reply: %s" % (server.__class__.__name__, 
146                 reply))
147         pass
148
149 def to_server_log_level(log_level):
150     return (
151         DC.DEBUG_LEVEL
152             if log_level == DC.DEBUG_LEVEL 
153         else DC.ERROR_LEVEL
154     )
155
156 def get_access_config_params(access_config):
157     mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
158     launch = not access_config.get_attribute_value(DC.RECOVER)
159     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
160     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
161     log_level = to_server_log_level(log_level)
162     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
163     environment_setup = (
164         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
165         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
166         else ""
167     )
168     user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
169     host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
170     port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
171     agent = access_config.get_attribute_value(DC.USE_AGENT)
172     sudo = access_config.get_attribute_value(DC.USE_SUDO)
173     key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
174     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
175     clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
176     return (mode, launch, root_dir, log_level, communication, user, host, port,
177             key, agent, sudo, environment_setup, clean_root)
178
179 class AccessConfiguration(AttributesMap):
180     def __init__(self, params = None):
181         super(AccessConfiguration, self).__init__()
182         
183         from nepi.core.metadata import Metadata
184         
185         for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
186             self.add_attribute(**attr_info)
187
188         if params:
189             for attr_name, attr_value in params.iteritems():
190                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
191                 attr_value = parser(attr_value)
192                 self.set_attribute_value(attr_name, attr_value)
193
194 class TempDir(object):
195     def __init__(self):
196         self.path = tempfile.mkdtemp()
197     
198     def __del__(self):
199         shutil.rmtree(self.path)
200
201 class PermDir(object):
202     def __init__(self, path):
203         self.path = path
204
205 def create_experiment_suite(xml, access_config, repetitions = None,
206         duration = None, wait_guids = None):
207     mode = None
208     if access_config :
209         (mode, launch, root_dir, log_level, communication, user, host, port, 
210                 key, agent, sudo, environment_setup, clean_root) \
211                         = get_access_config_params(access_config)
212
213     if not mode or mode == DC.MODE_SINGLE_PROCESS:
214         from nepi.core.execute import ExperimentSuite
215         if not root_dir:
216             root_dir = TempDir()
217         else:
218             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
219
220         exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
221                 wait_guids)
222         
223         # inject reference to temporary dir, so that it gets cleaned
224         # up at destruction time.
225         exp_suite._tempdir = root_dir
226         return exp_suite
227     elif mode == DC.MODE_DAEMON:
228         return ExperimentSuiteProxy(root_dir, log_level,
229                 xml,
230                 repetitions = repetitions, 
231                 duration = duration,
232                 wait_guids = wait_guids, 
233                 communication = communication,
234                 host = host, 
235                 port = port, 
236                 user = user, 
237                 ident_key = key,
238                 agent = agent, 
239                 sudo = sudo, 
240                 environment_setup = environment_setup, 
241                 clean_root = clean_root)
242     raise RuntimeError("Unsupported access configuration '%s'" % mode)
243
244 def create_experiment_controller(xml, access_config = None):
245     mode = None
246     launch = True
247     log_level = DC.ERROR_LEVEL
248     if access_config:
249         (mode, launch, root_dir, log_level, communication, user, host, port, 
250                 key, agent, sudo, environment_setup, clean_root) \
251                         = get_access_config_params(access_config)
252
253     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
254
255     if not mode or mode == DC.MODE_SINGLE_PROCESS:
256         from nepi.core.execute import ExperimentController
257         
258         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
259             root_dir = TempDir()
260         else:
261             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
262         controller = ExperimentController(xml, root_dir.path)
263         
264         # inject reference to temporary dir, so that it gets cleaned
265         # up at destruction time.
266         controller._tempdir = root_dir
267         
268         if not launch:
269             # try to recover
270             controller.recover()
271         
272         return controller
273     elif mode == DC.MODE_DAEMON:
274         try:
275             return ExperimentControllerProxy(root_dir, log_level,
276                 experiment_xml = xml,
277                 communication = communication,
278                 host = host, 
279                 port = port, 
280                 user = user, 
281                 ident_key = key,
282                 agent = agent, 
283                 sudo = sudo, 
284                 launch = launch,
285                 environment_setup = environment_setup, 
286                 clean_root = clean_root)
287         except:
288             if not launch:
289                 # Maybe controller died, recover from persisted testbed information if possible
290                 controller = ExperimentControllerProxy(root_dir, log_level,
291                     experiment_xml = xml,
292                     communication = communication,
293                     host = host, 
294                     port = port, 
295                     user = user, 
296                     ident_key = key,
297                     agent = agent, 
298                     sudo = sudo, 
299                     launch = True,
300                     environment_setup = environment_setup,
301                     clean_root = clean_root)
302                 controller.recover()
303                 return controller
304             else:
305                 raise
306     raise RuntimeError("Unsupported access configuration '%s'" % mode)
307
308 def create_testbed_controller(testbed_id, testbed_version, access_config):
309     mode = None
310     launch = True
311     log_level = DC.ERROR_LEVEL
312     if access_config:
313         (mode, launch, root_dir, log_level, communication, user, host, port, 
314                 key, agent, sudo, environment_setup, clean_root) \
315                         = get_access_config_params(access_config)
316
317     os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
318     
319     if not mode or mode == DC.MODE_SINGLE_PROCESS:
320         if not launch:
321             raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
322         return  _build_testbed_controller(testbed_id, testbed_version)
323     elif mode == DC.MODE_DAEMON:
324         return TestbedControllerProxy(root_dir, log_level, 
325                 testbed_id = testbed_id, 
326                 testbed_version = testbed_version,
327                 communication = communication,
328                 host = host, 
329                 port = port, 
330                 ident_key = key,
331                 user = user, 
332                 agent = agent, 
333                 sudo = sudo, 
334                 launch = launch,
335                 environment_setup = environment_setup, 
336                 clean_root = clean_root)
337     raise RuntimeError("Unsupported access configuration '%s'" % mode)
338
339 def _build_testbed_controller(testbed_id, testbed_version):
340     mod_name = nepi.util.environ.find_testbed(testbed_id)
341     
342     if not mod_name in sys.modules:
343         try:
344             __import__(mod_name)
345         except ImportError:
346             raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
347     
348     module = sys.modules[mod_name]
349     tc = module.TestbedController()
350     if tc.testbed_version != testbed_version:
351         raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
352                 (testbed_id, testbed_version, tc.testbed_version))
353     return tc
354
355 # Just a namespace class
356 class Marshalling:
357     class Decoders:
358         @staticmethod
359         def pickled_data(sdata):
360             return cPickle.loads(base64.b64decode(sdata))
361         
362         @staticmethod
363         def base64_data(sdata):
364             return base64.b64decode(sdata)
365         
366         @staticmethod
367         def nullint(sdata):
368             return None if sdata == "None" else int(sdata)
369
370         @staticmethod
371         def bool(sdata):
372             return sdata == 'True'
373         
374     class Encoders:
375         @staticmethod
376         def pickled_data(data):
377             return base64.b64encode(cPickle.dumps(data))
378         
379         @staticmethod
380         def base64_data(data):
381             if not data:
382                 return ""
383             return base64.b64encode(data)
384         
385         @staticmethod
386         def nullint(data):
387             return "None" if data is None else int(data)
388         
389         @staticmethod
390         def bool(data):
391             return str(bool(data))
392            
393     # import into Marshalling all the decoders
394     # they act as types
395     locals().update([
396         (typname, typ)
397         for typname, typ in vars(Decoders).iteritems()
398         if not typname.startswith('_')
399     ])
400
401     _TYPE_ENCODERS = dict([
402         # id(type) -> (<encoding_function>, <formatting_string>)
403         (typname, (getattr(Encoders,typname),"%s"))
404         for typname in vars(Decoders)
405         if not typname.startswith('_')
406            and hasattr(Encoders,typname)
407     ])
408
409     # Builtins
410     _TYPE_ENCODERS["float"] = (float, "%r")
411     _TYPE_ENCODERS["int"] = (int, "%d")
412     _TYPE_ENCODERS["long"] = (int, "%d")
413     _TYPE_ENCODERS["str"] = (str, "%s")
414     _TYPE_ENCODERS["unicode"] = (str, "%s")
415     
416     # Generic encoder
417     _TYPE_ENCODERS[None] = (str, "%s")
418     
419     @staticmethod
420     def args(*types):
421         """
422         Decorator that converts the given function into one that takes
423         a single "params" list, with each parameter marshalled according
424         to the given factory callable (type constructors are accepted).
425         
426         The first argument (self) is left untouched.
427         
428         eg:
429         
430         @Marshalling.args(int,int,str,base64_data)
431         def somefunc(self, someint, otherint, somestr, someb64):
432            return someretval
433         """
434         def decor(f):
435             @functools.wraps(f)
436             def rv(self, params):
437                 return f(self, *[ ctor(val)
438                                   for ctor,val in zip(types, params[1:]) ])
439             
440             rv._argtypes = types
441             
442             # Derive type encoders by looking up types in _TYPE_ENCODERS
443             # make_proxy will use it to encode arguments in command strings
444             argencoders = []
445             TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
446             for typ in types:
447                 if typ.__name__ in TYPE_ENCODERS:
448                     argencoders.append(TYPE_ENCODERS[typ.__name__])
449                 else:
450                     # generic encoder
451                     argencoders.append(TYPE_ENCODERS[None])
452             
453             rv._argencoders = tuple(argencoders)
454             
455             rv._retval = getattr(f, '_retval', None)
456             return rv
457         return decor
458
459     @staticmethod
460     def retval(typ=Decoders.base64_data):
461         """
462         Decorator that converts the given function into one that 
463         returns a properly encoded return string, given that the undecorated
464         function returns suitable input for the encoding function.
465         
466         The optional typ argument specifies a type.
467         For the default of base64_data, return values should be strings.
468         The return value of the encoding method should be a string always.
469         
470         eg:
471         
472         @Marshalling.args(int,int,str,base64_data)
473         @Marshalling.retval(str)
474         def somefunc(self, someint, otherint, somestr, someb64):
475            return someint
476         """
477         encode, fmt = Marshalling._TYPE_ENCODERS.get(
478             typ.__name__,
479             Marshalling._TYPE_ENCODERS[None])
480         fmt = "%d|"+fmt
481         
482         def decor(f):
483             @functools.wraps(f)
484             def rv(self, *p, **kw):
485                 data = f(self, *p, **kw)
486                 return fmt % (
487                     OK,
488                     encode(data)
489                 )
490             rv._retval = typ
491             rv._argtypes = getattr(f, '_argtypes', None)
492             rv._argencoders = getattr(f, '_argencoders', None)
493             return rv
494         return decor
495     
496     @staticmethod
497     def retvoid(f):
498         """
499         Decorator that converts the given function into one that 
500         always return an encoded empty string.
501         
502         Useful for null-returning functions.
503         """
504         OKRV = "%d|" % (OK,)
505         
506         @functools.wraps(f)
507         def rv(self, *p, **kw):
508             f(self, *p, **kw)
509             return OKRV
510         
511         rv._retval = None
512         rv._argtypes = getattr(f, '_argtypes', None)
513         rv._argencoders = getattr(f, '_argencoders', None)
514         return rv
515     
516     @staticmethod
517     def handles(whichcommand):
518         """
519         Associates the method with a given command code for servers.
520         It should always be the topmost decorator.
521         """
522         def decor(f):
523             f._handles_command = whichcommand
524             return f
525         return decor
526
527 class BaseServer(server.Server):
528     def reply_action(self, msg):
529         if not msg:
530             result = base64.b64encode("Invalid command line")
531             reply = "%d|%s" % (ERROR, result)
532         else:
533             params = msg.split("|")
534             instruction = int(params[0])
535             log_msg(self, params)
536             try:
537                 for mname,meth in vars(self.__class__).iteritems():
538                     if not mname.startswith('_'):
539                         cmd = getattr(meth, '_handles_command', None)
540                         if cmd == instruction:
541                             meth = getattr(self, mname)
542                             reply = meth(params)
543                             break
544                 else:
545                     error = "Invalid instruction %s" % instruction
546                     self.log_error(error)
547                     result = base64.b64encode(error)
548                     reply = "%d|%s" % (ERROR, result)
549             except:
550                 error = self.log_error()
551                 result = base64.b64encode(error)
552                 reply = "%d|%s" % (ERROR, result)
553         log_reply(self, reply)
554         return reply
555
556 class ExperimentSuiteServer(BaseServer):
557     def __init__(self, root_dir, log_level, 
558             xml, repetitions, duration, wait_guids, 
559             communication = DC.ACCESS_LOCAL,
560             host = None, 
561             port = None, 
562             user = None, 
563             ident_key = None, 
564             agent = None,
565             sudo = False, 
566             environment_setup = "", 
567             clean_root = False):
568         super(ExperimentSuiteServer, self).__init__(root_dir, log_level, 
569             environment_setup = environment_setup, clean_root = clean_root)
570         access_config = AccessConfiguration()
571         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
572         access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
573         access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
574         if user:
575             access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
576         if host:
577             access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
578         if port:
579             access_config.set_attribute_value(DC.DEPLOYMENT_PORT, port)
580         if agent:    
581             access_config.set_attribute_value(DC.USE_AGENT, agent)
582         if sudo:
583             acess_config.set_attribute_value(DC.USE_SUDO, sudo)
584         if ident_key:
585             access_config.set_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
586         if communication:
587             access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
588         if clean_root:
589             access_config.set_attribute_value(DC.CLEAN_ROOT, clean_root)
590         self._experiment_xml = xml
591         self._duration = duration
592         self._repetitions = repetitions
593         self._wait_guids = wait_guids
594         self._access_config = access_config
595         self._experiment_suite = None
596
597     def post_daemonize(self):
598         from nepi.core.execute import ExperimentSuite
599         self._experiment_suite = ExperimentSuite(
600                 self._experiment_xml, self._access_config, 
601                 self._repetitions, self._duration, self._wait_guids)
602
603     @Marshalling.handles(CURRENT)
604     @Marshalling.args()
605     @Marshalling.retval(int)
606     def current(self):
607         return self._experiment_suite.current()
608    
609     @Marshalling.handles(STATUS)
610     @Marshalling.args()
611     @Marshalling.retval(int)
612     def status(self):
613         return self._experiment_suite.status()
614     
615     @Marshalling.handles(FINISHED)
616     @Marshalling.args()
617     @Marshalling.retval(Marshalling.bool)
618     def is_finished(self):
619         return self._experiment_suite.is_finished()
620
621     @Marshalling.handles(ACCESS_CONFIGURATIONS)
622     @Marshalling.args()
623     @Marshalling.retval( Marshalling.pickled_data )
624     def get_access_configurations(self):
625         return self._experiment_suite.get_access_configurations()
626
627     @Marshalling.handles(START)
628     @Marshalling.args()
629     @Marshalling.retvoid
630     def start(self):
631         self._experiment_suite.start()
632
633     @Marshalling.handles(SHUTDOWN)
634     @Marshalling.args()
635     @Marshalling.retvoid
636     def shutdown(self):
637         self._experiment_suite.shutdown()
638
639     @Marshalling.handles(CURRENT_ACCESS_CONFIG)
640     @Marshalling.args()
641     @Marshalling.retval( Marshalling.pickled_data )
642     def get_current_access_config(self):
643         return self._experiment_suite.get_current_access_config()
644
645 class TestbedControllerServer(BaseServer):
646     def __init__(self, root_dir, log_level, testbed_id, testbed_version, 
647             environment_setup, clean_root):
648         super(TestbedControllerServer, self).__init__(root_dir, log_level, 
649             environment_setup = environment_setup, clean_root = clean_root)
650         self._testbed_id = testbed_id
651         self._testbed_version = testbed_version
652         self._testbed = None
653
654     def post_daemonize(self):
655         self._testbed = _build_testbed_controller(self._testbed_id, 
656                 self._testbed_version)
657
658     @Marshalling.handles(GUIDS)
659     @Marshalling.args()
660     @Marshalling.retval( Marshalling.pickled_data )
661     def guids(self):
662         return self._testbed.guids
663
664     @Marshalling.handles(TESTBED_ID)
665     @Marshalling.args()
666     @Marshalling.retval()
667     def testbed_id(self):
668         return str(self._testbed.testbed_id)
669
670     @Marshalling.handles(TESTBED_VERSION)
671     @Marshalling.args()
672     @Marshalling.retval()
673     def testbed_version(self):
674         return str(self._testbed.testbed_version)
675
676     @Marshalling.handles(CREATE)
677     @Marshalling.args(int, str)
678     @Marshalling.retvoid
679     def defer_create(self, guid, factory_id):
680         self._testbed.defer_create(guid, factory_id)
681
682     @Marshalling.handles(TRACE)
683     @Marshalling.args(int, str, Marshalling.base64_data)
684     @Marshalling.retval()
685     def trace(self, guid, trace_id, attribute):
686         return self._testbed.trace(guid, trace_id, attribute)
687
688     @Marshalling.handles(TRACES_INFO)
689     @Marshalling.args()
690     @Marshalling.retval( Marshalling.pickled_data )
691     def traces_info(self):
692         return self._testbed.traces_info()
693
694     @Marshalling.handles(START)
695     @Marshalling.args()
696     @Marshalling.retvoid
697     def start(self):
698         self._testbed.start()
699
700     @Marshalling.handles(STOP)
701     @Marshalling.args()
702     @Marshalling.retvoid
703     def stop(self):
704         self._testbed.stop()
705
706     @Marshalling.handles(SHUTDOWN)
707     @Marshalling.args()
708     @Marshalling.retvoid
709     def shutdown(self):
710         self._testbed.shutdown()
711
712     @Marshalling.handles(CONFIGURE)
713     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
714     @Marshalling.retvoid
715     def defer_configure(self, name, value):
716         self._testbed.defer_configure(name, value)
717
718     @Marshalling.handles(CREATE_SET)
719     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
720     @Marshalling.retvoid
721     def defer_create_set(self, guid, name, value):
722         self._testbed.defer_create_set(guid, name, value)
723
724     @Marshalling.handles(FACTORY_SET)
725     @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
726     @Marshalling.retvoid
727     def defer_factory_set(self, name, value):
728         self._testbed.defer_factory_set(name, value)
729
730     @Marshalling.handles(CONNECT)
731     @Marshalling.args(int, str, int, str)
732     @Marshalling.retvoid
733     def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
734         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
735             connector_type_name2)
736
737     @Marshalling.handles(CROSS_CONNECT)
738     @Marshalling.args(int, str, int, int, str, str, str)
739     @Marshalling.retvoid
740     def defer_cross_connect(self, 
741             guid, connector_type_name,
742             cross_guid, cross_testbed_guid,
743             cross_testbed_id, cross_factory_id,
744             cross_connector_type_name):
745         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
746             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
747             cross_connector_type_name)
748
749     @Marshalling.handles(ADD_TRACE)
750     @Marshalling.args(int, str)
751     @Marshalling.retvoid
752     def defer_add_trace(self, guid, trace_id):
753         self._testbed.defer_add_trace(guid, trace_id)
754
755     @Marshalling.handles(ADD_ADDRESS)
756     @Marshalling.args(int, str, int, Marshalling.pickled_data)
757     @Marshalling.retvoid
758     def defer_add_address(self, guid, address, netprefix, broadcast):
759         self._testbed.defer_add_address(guid, address, netprefix,
760                 broadcast)
761
762     @Marshalling.handles(ADD_ROUTE)
763     @Marshalling.args(int, str, int, str, int)
764     @Marshalling.retvoid
765     def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
766         self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
767
768     @Marshalling.handles(DO_SETUP)
769     @Marshalling.args()
770     @Marshalling.retvoid
771     def do_setup(self):
772         self._testbed.do_setup()
773
774     @Marshalling.handles(DO_CREATE)
775     @Marshalling.args()
776     @Marshalling.retvoid
777     def do_create(self):
778         self._testbed.do_create()
779
780     @Marshalling.handles(DO_CONNECT_INIT)
781     @Marshalling.args()
782     @Marshalling.retvoid
783     def do_connect_init(self):
784         self._testbed.do_connect_init()
785
786     @Marshalling.handles(DO_CONNECT_COMPL)
787     @Marshalling.args()
788     @Marshalling.retvoid
789     def do_connect_compl(self):
790         self._testbed.do_connect_compl()
791
792     @Marshalling.handles(DO_CONFIGURE)
793     @Marshalling.args()
794     @Marshalling.retvoid
795     def do_configure(self):
796         self._testbed.do_configure()
797
798     @Marshalling.handles(DO_PRECONFIGURE)
799     @Marshalling.args()
800     @Marshalling.retvoid
801     def do_preconfigure(self):
802         self._testbed.do_preconfigure()
803
804     @Marshalling.handles(DO_PRESTART)
805     @Marshalling.args()
806     @Marshalling.retvoid
807     def do_prestart(self):
808         self._testbed.do_prestart()
809
810     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
811     @Marshalling.args( Marshalling.Decoders.pickled_data )
812     @Marshalling.retvoid
813     def do_cross_connect_init(self, cross_data):
814         self._testbed.do_cross_connect_init(cross_data)
815
816     @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
817     @Marshalling.args( Marshalling.Decoders.pickled_data )
818     @Marshalling.retvoid
819     def do_cross_connect_compl(self, cross_data):
820         self._testbed.do_cross_connect_compl(cross_data)
821
822     @Marshalling.handles(GET)
823     @Marshalling.args(int, Marshalling.base64_data, str)
824     @Marshalling.retval( Marshalling.pickled_data )
825     def get(self, guid, name, time):
826         return self._testbed.get(guid, name, time)
827
828     @Marshalling.handles(SET)
829     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
830     @Marshalling.retvoid
831     def set(self, guid, name, value, time):
832         self._testbed.set(guid, name, value, time)
833
834     @Marshalling.handles(GET_ADDRESS)
835     @Marshalling.args(int, int, Marshalling.base64_data)
836     @Marshalling.retval()
837     def get_address(self, guid, index, attribute):
838         return str(self._testbed.get_address(guid, index, attribute))
839
840     @Marshalling.handles(GET_ROUTE)
841     @Marshalling.args(int, int, Marshalling.base64_data)
842     @Marshalling.retval()
843     def get_route(self, guid, index, attribute):
844         return str(self._testbed.get_route(guid, index, attribute))
845
846     @Marshalling.handles(ACTION)
847     @Marshalling.args(str, int, Marshalling.base64_data)
848     @Marshalling.retvoid
849     def action(self, time, guid, command):
850         self._testbed.action(time, guid, command)
851
852     @Marshalling.handles(STATUS)
853     @Marshalling.args(Marshalling.nullint)
854     @Marshalling.retval(int)
855     def status(self, guid):
856         return self._testbed.status(guid)
857
858     @Marshalling.handles(TESTBED_STATUS)
859     @Marshalling.args()
860     @Marshalling.retval(int)
861     def testbed_status(self):
862         return self._testbed.testbed_status()
863
864     @Marshalling.handles(GET_ATTRIBUTE_LIST)
865     @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
866     @Marshalling.retval( Marshalling.pickled_data )
867     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
868         return self._testbed.get_attribute_list(guid, filter_flags, exclude)
869
870     @Marshalling.handles(GET_FACTORY_ID)
871     @Marshalling.args(int)
872     @Marshalling.retval()
873     def get_factory_id(self, guid):
874         return self._testbed.get_factory_id(guid)
875
876     @Marshalling.handles(RECOVER)
877     @Marshalling.args()
878     @Marshalling.retvoid
879     def recover(self):
880         self._testbed.recover()
881
882
883 class ExperimentControllerServer(BaseServer):
884     def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
885             clean_root):
886         super(ExperimentControllerServer, self).__init__(root_dir, log_level, 
887             environment_setup = environment_setup, clean_root = clean_root)
888         self._experiment_xml = experiment_xml
889         self._experiment = None
890
891     def post_daemonize(self):
892         from nepi.core.execute import ExperimentController
893         self._experiment = ExperimentController(self._experiment_xml, 
894             root_dir = self._root_dir)
895
896     @Marshalling.handles(GUIDS)
897     @Marshalling.args()
898     @Marshalling.retval( Marshalling.pickled_data )
899     def guids(self):
900         return self._experiment.guids
901
902     @Marshalling.handles(STARTED_TIME)
903     @Marshalling.args()
904     @Marshalling.retval( Marshalling.pickled_data )
905     def started_time(self):
906         return self._experiment.started_time
907
908     @Marshalling.handles(STOPPED_TIME)
909     @Marshalling.args()
910     @Marshalling.retval( Marshalling.pickled_data )
911     def stopped_time(self):
912         return self._experiment.stopped_time
913
914     @Marshalling.handles(XML)
915     @Marshalling.args()
916     @Marshalling.retval()
917     def experiment_design_xml(self):
918         return self._experiment.experiment_design_xml
919         
920     @Marshalling.handles(EXEC_XML)
921     @Marshalling.args()
922     @Marshalling.retval()
923     def experiment_execute_xml(self):
924         return self._experiment.experiment_execute_xml
925         
926     @Marshalling.handles(TRACE)
927     @Marshalling.args(int, str, Marshalling.base64_data)
928     @Marshalling.retval()
929     def trace(self, guid, trace_id, attribute):
930         return str(self._experiment.trace(guid, trace_id, attribute))
931
932     @Marshalling.handles(TRACES_INFO)
933     @Marshalling.args()
934     @Marshalling.retval( Marshalling.pickled_data )
935     def traces_info(self):
936         return self._experiment.traces_info()
937
938     @Marshalling.handles(FINISHED)
939     @Marshalling.args(int)
940     @Marshalling.retval(Marshalling.bool)
941     def is_finished(self, guid):
942         return self._experiment.is_finished(guid)
943
944     @Marshalling.handles(STATUS)
945     @Marshalling.args(int)
946     @Marshalling.retval(int)
947     def status(self, guid):
948         return self._experiment.status(guid)
949
950     @Marshalling.handles(GET)
951     @Marshalling.args(int, Marshalling.base64_data, str)
952     @Marshalling.retval( Marshalling.pickled_data )
953     def get(self, guid, name, time):
954         return self._experiment.get(guid, name, time)
955
956     @Marshalling.handles(SET)
957     @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
958     @Marshalling.retvoid
959     def set(self, guid, name, value, time):
960         self._experiment.set(guid, name, value, time)
961
962     @Marshalling.handles(START)
963     @Marshalling.args()
964     @Marshalling.retvoid
965     def start(self):
966         self._experiment.start()
967
968     @Marshalling.handles(STOP)
969     @Marshalling.args()
970     @Marshalling.retvoid
971     def stop(self):
972         self._experiment.stop()
973
974     @Marshalling.handles(RECOVER)
975     @Marshalling.args()
976     @Marshalling.retvoid
977     def recover(self):
978         self._experiment.recover()
979
980     @Marshalling.handles(SHUTDOWN)
981     @Marshalling.args()
982     @Marshalling.retvoid
983     def shutdown(self):
984         self._experiment.shutdown()
985
986     @Marshalling.handles(GET_TESTBED_ID)
987     @Marshalling.args(int)
988     @Marshalling.retval()
989     def get_testbed_id(self, guid):
990         return self._experiment.get_testbed_id(guid)
991
992     @Marshalling.handles(GET_FACTORY_ID)
993     @Marshalling.args(int)
994     @Marshalling.retval()
995     def get_factory_id(self, guid):
996         return self._experiment.get_factory_id(guid)
997
998     @Marshalling.handles(GET_TESTBED_VERSION)
999     @Marshalling.args(int)
1000     @Marshalling.retval()
1001     def get_testbed_version(self, guid):
1002         return self._experiment.get_testbed_version(guid)
1003
1004 class BaseProxy(object):
1005     _ServerClass = None
1006     _ServerClassModule = "nepi.util.proxy"
1007     
1008     def __init__(self, ctor_args, root_dir, 
1009             launch = True, 
1010             communication = DC.ACCESS_LOCAL,
1011             host = None, 
1012             port = None, 
1013             user = None, 
1014             ident_key = None, 
1015             agent = None,
1016             sudo = False, 
1017             environment_setup = "",
1018             clean_root = False):
1019         if launch:
1020             python_code = (
1021                     "from %(classmodule)s import %(classname)s;"
1022                     "s = %(classname)s%(ctor_args)r;"
1023                     "s.run()" 
1024                 % dict(
1025                     classname = self._ServerClass.__name__,
1026                     classmodule = self._ServerClassModule,
1027                     ctor_args = ctor_args
1028                 ) )
1029             proc = server.popen_python(python_code,
1030                         communication = communication,
1031                         host = host,
1032                         port = port, 
1033                         user = user, 
1034                         agent = agent,
1035                         ident_key = ident_key, 
1036                         sudo = sudo,
1037                         environment_setup = environment_setup) 
1038             # Wait for the server to be ready, otherwise nobody
1039             # will be able to connect to it
1040             err = []
1041             helo = "nope"
1042             while helo:
1043                 helo = proc.stderr.readline()
1044                 if helo == 'SERVER_READY.\n':
1045                     break
1046                 err.append(helo)
1047             else:
1048                 raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
1049         # connect client to server
1050         self._client = server.Client(root_dir, 
1051                 communication = communication,
1052                 host = host, 
1053                 port = port, 
1054                 user = user, 
1055                 agent = agent, 
1056                 sudo = sudo,
1057                 environment_setup = environment_setup)
1058     
1059     @staticmethod
1060     def _make_message(argtypes, argencoders, command, methname, classname, *args):
1061         if len(argtypes) != len(argencoders):
1062             raise ValueError, "Invalid arguments for _make_message: "\
1063                 "in stub method %s of class %s "\
1064                 "argtypes and argencoders must match in size" % (
1065                     methname, classname )
1066         if len(argtypes) != len(args):
1067             raise ValueError, "Invalid arguments for _make_message: "\
1068                 "in stub method %s of class %s "\
1069                 "expected %d arguments, got %d" % (
1070                     methname, classname,
1071                     len(argtypes), len(args))
1072         
1073         buf = []
1074         for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
1075             try:
1076                 buf.append(fmt % encode(val))
1077             except:
1078                 import traceback
1079                 raise TypeError, "Argument %d of stub method %s of class %s "\
1080                     "requires a value of type %s, but got %s - nested error: %s" % (
1081                         argnum, methname, classname,
1082                         getattr(typ, '__name__', typ), type(val),
1083                         traceback.format_exc()
1084                 )
1085         
1086         return "%d|%s" % (command, '|'.join(buf))
1087     
1088     @staticmethod
1089     def _parse_reply(rvtype, methname, classname, reply):
1090         if not reply:
1091             raise RuntimeError, "Invalid reply: %r "\
1092                 "for stub method %s of class %s" % (
1093                     reply,
1094                     methname,
1095                     classname)
1096         
1097         try:
1098             result = reply.split("|")
1099             code = int(result[0])
1100             text = result[1]
1101         except:
1102             import traceback
1103             raise TypeError, "Return value of stub method %s of class %s "\
1104                 "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
1105                     methname, classname,
1106                     getattr(rvtype, '__name__', rvtype), reply,
1107                     traceback.format_exc()
1108             )
1109         if code == ERROR:
1110             text = base64.b64decode(text)
1111             raise RuntimeError(text)
1112         elif code == OK:
1113             try:
1114                 if rvtype is None:
1115                     return
1116                 else:
1117                     return rvtype(text)
1118             except:
1119                 import traceback
1120                 raise TypeError, "Return value of stub method %s of class %s "\
1121                     "cannot be parsed: must be of type %s - nested error: %s" % (
1122                         methname, classname,
1123                         getattr(rvtype, '__name__', rvtype),
1124                         traceback.format_exc()
1125                 )
1126         else:
1127             raise RuntimeError, "Invalid reply: %r "\
1128                 "for stub method %s of class %s - unknown code" % (
1129                     reply,
1130                     methname,
1131                     classname)
1132     
1133     @staticmethod
1134     def _make_stubs(server_class, template_class):
1135         """
1136         Returns a dictionary method_name -> method
1137         with stub methods.
1138         
1139         Usage:
1140         
1141             class SomeProxy(BaseProxy):
1142                ...
1143                
1144                locals().update( BaseProxy._make_stubs(
1145                     ServerClass,
1146                     TemplateClass
1147                ) )
1148         
1149         ServerClass is the corresponding Server class, as
1150         specified in the _ServerClass class method (_make_stubs
1151         is static and can't access the method), and TemplateClass
1152         is the ultimate implementation class behind the server,
1153         from which argument names and defaults are taken, to
1154         maintain meaningful interfaces.
1155         """
1156         rv = {}
1157         
1158         class NONE: pass
1159         
1160         import os.path
1161         func_template_path = os.path.join(
1162             os.path.dirname(__file__),
1163             'proxy_stub.tpl')
1164         func_template_file = open(func_template_path, "r")
1165         func_template = func_template_file.read()
1166         func_template_file.close()
1167         
1168         for methname in vars(template_class).copy():
1169             if methname.endswith('_deferred'):
1170                 # cannot wrap deferreds...
1171                 continue
1172             dmethname = methname+'_deferred'
1173             if hasattr(server_class, methname) and not methname.startswith('_'):
1174                 template_meth = getattr(template_class, methname)
1175                 server_meth = getattr(server_class, methname)
1176                 
1177                 command = getattr(server_meth, '_handles_command', None)
1178                 argtypes = getattr(server_meth, '_argtypes', None)
1179                 argencoders = getattr(server_meth, '_argencoders', None)
1180                 rvtype = getattr(server_meth, '_retval', None)
1181                 doprop = False
1182                 
1183                 if hasattr(template_meth, 'fget'):
1184                     # property getter
1185                     template_meth = template_meth.fget
1186                     doprop = True
1187                 
1188                 if command is not None and argtypes is not None and argencoders is not None:
1189                     # We have an interface method...
1190                     code = template_meth.func_code
1191                     argnames = code.co_varnames[:code.co_argcount]
1192                     argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
1193                                   + (template_meth.func_defaults or ()) )
1194                     
1195                     func_globals = dict(
1196                         BaseProxy = BaseProxy,
1197                         argtypes = argtypes,
1198                         argencoders = argencoders,
1199                         rvtype = rvtype,
1200                         functools = functools,
1201                     )
1202                     context = dict()
1203                     
1204                     func_text = func_template % dict(
1205                         self = argnames[0],
1206                         args = '%s' % (','.join(argnames[1:])),
1207                         argdefs = ','.join([
1208                             argname if argdef is NONE
1209                             else "%s=%r" % (argname, argdef)
1210                             for argname, argdef in zip(argnames[1:], argdefaults[1:])
1211                         ]),
1212                         command = command,
1213                         methname = methname,
1214                         classname = server_class.__name__
1215                     )
1216                     
1217                     func_text = compile(
1218                         func_text,
1219                         func_template_path,
1220                         'exec')
1221                     
1222                     exec func_text in func_globals, context
1223                     
1224                     if doprop:
1225                         rv[methname] = property(context[methname])
1226                         rv[dmethname] = property(context[dmethname])
1227                     else:
1228                         rv[methname] = context[methname]
1229                         rv[dmethname] = context[dmethname]
1230                     
1231                     # inject _deferred into core classes
1232                     if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
1233                         def freezename(methname, dmethname):
1234                             def dmeth(self, *p, **kw): 
1235                                 return getattr(self, methname)(*p, **kw)
1236                             dmeth.__name__ = dmethname
1237                             return dmeth
1238                         dmeth = freezename(methname, dmethname)
1239                         setattr(template_class, dmethname, dmeth)
1240         
1241         return rv
1242
1243 class ExperimentSuiteProxy(BaseProxy):
1244     
1245     _ServerClass = ExperimentSuiteServer
1246     
1247     def __init__(self, root_dir, log_level,
1248             xml, repetitions, duration, wait_guids, 
1249             communication = DC.ACCESS_LOCAL,
1250             host = None, 
1251             port = None, 
1252             user = None, 
1253             ident_key = None, 
1254             agent = None,
1255             sudo = False, 
1256             environment_setup = "", 
1257             clean_root = False):
1258         super(ExperimentSuiteProxy,self).__init__(
1259             ctor_args = (root_dir, log_level,
1260                 xml, 
1261                 repetitions, 
1262                 duration,
1263                 wait_guids, 
1264                 communication,
1265                 host, 
1266                 port, 
1267                 user, 
1268                 ident_key,
1269                 agent, 
1270                 sudo, 
1271                 environment_setup, 
1272                 clean_root),
1273             root_dir = root_dir,
1274             launch = True, #launch
1275             communication = communication,
1276             host = host, 
1277             port = port, 
1278             user = user,
1279             ident_key = ident_key, 
1280             agent = agent, 
1281             sudo = sudo, 
1282             environment_setup = environment_setup)
1283
1284     locals().update( BaseProxy._make_stubs(
1285         server_class = ExperimentSuiteServer,
1286         template_class = nepi.core.execute.ExperimentSuite,
1287     ) )
1288     
1289     # Shutdown stops the serverside...
1290     def shutdown(self, _stub = shutdown):
1291         rv = _stub(self)
1292         self._client.send_stop()
1293         self._client.read_reply() # wait for it
1294         return rv
1295
1296 class TestbedControllerProxy(BaseProxy):
1297     
1298     _ServerClass = TestbedControllerServer
1299     
1300     def __init__(self, root_dir, log_level, 
1301             testbed_id = None, 
1302             testbed_version = None, 
1303             launch = True, 
1304             communication = DC.ACCESS_LOCAL,
1305             host = None, 
1306             port = None, 
1307             user = None, 
1308             ident_key = None, 
1309             agent = None,
1310             sudo = False, 
1311             environment_setup = "", 
1312             clean_root = False):
1313         if launch and (testbed_id == None or testbed_version == None):
1314             raise RuntimeError("To launch a TesbedControllerServer a "
1315                     "testbed_id and testbed_version are required")
1316         super(TestbedControllerProxy,self).__init__(
1317             ctor_args = (root_dir, log_level, testbed_id, testbed_version,
1318                 environment_setup, clean_root),
1319             root_dir = root_dir,
1320             launch = launch,
1321             communication = communication,
1322             host = host, 
1323             port = port, 
1324             user = user,
1325             ident_key = ident_key, 
1326             agent = agent, 
1327             sudo = sudo, 
1328             environment_setup = environment_setup)
1329
1330     locals().update( BaseProxy._make_stubs(
1331         server_class = TestbedControllerServer,
1332         template_class = nepi.core.execute.TestbedController,
1333     ) )
1334     
1335     # Shutdown stops the serverside...
1336     def shutdown(self, _stub = shutdown):
1337         rv = _stub(self)
1338         self._client.send_stop()
1339         self._client.read_reply() # wait for it
1340         return rv
1341     
1342
1343 class ExperimentControllerProxy(BaseProxy):
1344     _ServerClass = ExperimentControllerServer
1345     
1346     def __init__(self, root_dir, log_level, 
1347             experiment_xml = None, 
1348             launch = True, 
1349             communication = DC.ACCESS_LOCAL,
1350             host = None, 
1351             port = None, 
1352             user = None, 
1353             ident_key = None, 
1354             agent = None, 
1355             sudo = False, 
1356             environment_setup = "",
1357             clean_root = False):
1358         super(ExperimentControllerProxy,self).__init__(
1359             ctor_args = (root_dir, log_level, experiment_xml, environment_setup, 
1360                 clean_root),
1361             root_dir = root_dir,
1362             launch = launch, 
1363             communication = communication,
1364             host = host, 
1365             port = port, 
1366             user = user,
1367             ident_key = ident_key, 
1368             agent = agent, 
1369             sudo = sudo, 
1370             environment_setup = environment_setup,
1371             clean_root = clean_root)
1372
1373     locals().update( BaseProxy._make_stubs(
1374         server_class = ExperimentControllerServer,
1375         template_class = nepi.core.execute.ExperimentController,
1376     ) )
1377
1378     # Shutdown stops the serverside...
1379     def shutdown(self, _stub = shutdown):
1380         rv = _stub(self)
1381         self._client.send_stop()
1382         self._client.read_reply() # wait for it
1383         return rv
1384