Ticket #29: introduce some "standard" box attributes to support testbed-in-testbed...
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP
8 import getpass
9 import cPickle
10 import sys
11 import time
12 import tempfile
13 import shutil
14
15 # PROTOCOL REPLIES
16 OK = 0
17 ERROR = 1
18
19 # PROTOCOL INSTRUCTION MESSAGES
20 XML = 2 
21 ACCESS  = 3
22 TRACE   = 4
23 FINISHED    = 5
24 START   = 6
25 STOP    = 7
26 SHUTDOWN    = 8
27 CONFIGURE   = 9
28 CREATE      = 10
29 CREATE_SET  = 11
30 FACTORY_SET = 12
31 CONNECT     = 13
32 CROSS_CONNECT   = 14
33 ADD_TRACE   = 15
34 ADD_ADDRESS = 16
35 ADD_ROUTE   = 17
36 DO_SETUP    = 18
37 DO_CREATE   = 19
38 DO_CONNECT_INIT = 20
39 DO_CONFIGURE    = 21
40 DO_CROSS_CONNECT_INIT   = 22
41 GET = 23
42 SET = 24
43 ACTION  = 25
44 STATUS  = 26
45 GUIDS  = 27
46 GET_ROUTE = 28
47 GET_ADDRESS = 29
48 RECOVER = 30
49 DO_PRECONFIGURE     = 31
50 GET_ATTRIBUTE_LIST  = 32
51 DO_CONNECT_COMPL    = 33
52 DO_CROSS_CONNECT_COMPL  = 34
53 TESTBED_ID  = 35
54 TESTBED_VERSION  = 36
55 EXPERIMENT_SET = 37
56 EXPERIMENT_GET = 38
57
58 # PARAMETER TYPE
59 STRING  =  100
60 INTEGER = 101
61 BOOL    = 102
62 FLOAT   = 103
63
64 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
65 controller_messages = dict({
66     XML:    "%d" % XML,
67     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
68     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
69     FINISHED:   "%d|%s" % (FINISHED, "%d"),
70     START:  "%d" % START,
71     STOP:   "%d" % STOP,
72     RECOVER : "%d" % RECOVER,
73     SHUTDOWN:   "%d" % SHUTDOWN,
74     })
75
76 # TESTBED INSTANCE PROTOCOL MESSAGES
77 testbed_messages = dict({
78     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
79     START:  "%d" % START,
80     STOP:   "%d" % STOP,
81     SHUTDOWN:   "%d" % SHUTDOWN,
82     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
83     CREATE: "%d|%s" % (CREATE, "%d|%s"),
84     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
85     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
86     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
87     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"),
88     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
89     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
90     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
91     DO_SETUP:   "%d" % DO_SETUP,
92     DO_CREATE:  "%d" % DO_CREATE,
93     DO_CONNECT_INIT:    "%d" % DO_CONNECT_INIT,
94     DO_CONNECT_COMPL:   "%d" % DO_CONNECT_COMPL,
95     DO_CONFIGURE:       "%d" % DO_CONFIGURE,
96     DO_PRECONFIGURE:    "%d" % DO_PRECONFIGURE,
97     DO_CROSS_CONNECT_INIT:  "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
98     DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
99     GET:    "%d|%s" % (GET, "%d|%s|%s"),
100     SET:    "%d|%s" % (SET, "%d|%s|%s|%d|%s"),
101     EXPERIMENT_GET:    "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"),
102     EXPERIMENT_SET:    "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"),
103     GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
104     GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
105     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
106     STATUS: "%d|%s" % (STATUS, "%s"),
107     GUIDS:  "%d" % GUIDS,
108     GET_ATTRIBUTE_LIST:  "%d" % GET_ATTRIBUTE_LIST,
109     TESTBED_ID:  "%d" % TESTBED_ID,
110     TESTBED_VERSION:  "%d" % TESTBED_VERSION,
111    })
112
113 instruction_text = dict({
114     OK:     "OK",
115     ERROR:  "ERROR",
116     XML:    "XML",
117     ACCESS: "ACCESS",
118     TRACE:  "TRACE",
119     FINISHED:   "FINISHED",
120     START:  "START",
121     STOP:   "STOP",
122     RECOVER: "RECOVER",
123     SHUTDOWN:   "SHUTDOWN",
124     CONFIGURE:  "CONFIGURE",
125     CREATE: "CREATE",
126     CREATE_SET: "CREATE_SET",
127     FACTORY_SET:    "FACTORY_SET",
128     CONNECT:    "CONNECT",
129     CROSS_CONNECT: "CROSS_CONNECT",
130     ADD_TRACE:  "ADD_TRACE",
131     ADD_ADDRESS:    "ADD_ADDRESS",
132     ADD_ROUTE:  "ADD_ROUTE",
133     DO_SETUP:   "DO_SETUP",
134     DO_CREATE:  "DO_CREATE",
135     DO_CONNECT_INIT: "DO_CONNECT_INIT",
136     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
137     DO_CONFIGURE:   "DO_CONFIGURE",
138     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
139     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
140     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
141     GET:    "GET",
142     SET:    "SET",
143     GET_ROUTE: "GET_ROUTE",
144     GET_ADDRESS: "GET_ADDRESS",
145     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
146     ACTION: "ACTION",
147     STATUS: "STATUS",
148     GUIDS:  "GUIDS",
149     STRING: "STRING",
150     INTEGER:    "INTEGER",
151     BOOL:   "BOOL",
152     FLOAT:  "FLOAT",
153     TESTBED_ID: "TESTBED_ID",
154     TESTBED_VERSION: "TESTBED_VERSION",
155     EXPERIMENT_SET: "EXPERIMENT_SET",
156     EXPERIMENT_GET: "EXPERIMENT_GET",
157     })
158
159 def get_type(value):
160     if isinstance(value, bool):
161         return BOOL
162     elif isinstance(value, int):
163         return INTEGER
164     elif isinstance(value, float):
165         return FLOAT
166     else:
167         return STRING
168
169 def set_type(type, value):
170     if type == INTEGER:
171         value = int(value)
172     elif type == FLOAT:
173         value = float(value)
174     elif type == BOOL:
175         value = value == "True"
176     else:
177         value = str(value)
178     return value
179
180 def log_msg(server, params):
181     instr = int(params[0])
182     instr_txt = instruction_text[instr]
183     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
184         instr_txt, ", ".join(map(str, params[1:]))))
185
186 def log_reply(server, reply):
187     res = reply.split("|")
188     code = int(res[0])
189     code_txt = instruction_text[code]
190     txt = base64.b64decode(res[1])
191     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
192             code_txt, txt))
193
194 def to_server_log_level(log_level):
195     return server.DEBUG_LEVEL \
196             if log_level == AccessConfiguration.DEBUG_LEVEL \
197                 else server.ERROR_LEVEL
198
199 def get_access_config_params(access_config):
200     root_dir = access_config.get_attribute_value("rootDirectory")
201     log_level = access_config.get_attribute_value("logLevel")
202     log_level = to_server_log_level(log_level)
203     user = host = port = agent = None
204     communication = access_config.get_attribute_value("communication")
205     if communication == AccessConfiguration.ACCESS_SSH:
206         user = access_config.get_attribute_value("user")
207         host = access_config.get_attribute_value("host")
208         port = access_config.get_attribute_value("port")
209         agent = access_config.get_attribute_value("useAgent")
210     return (root_dir, log_level, user, host, port, agent)
211
212 class AccessConfiguration(AttributesMap):
213     MODE_SINGLE_PROCESS = "SINGLE"
214     MODE_DAEMON = "DAEMON"
215     ACCESS_SSH = "SSH"
216     ACCESS_LOCAL = "LOCAL"
217     ERROR_LEVEL = "Error"
218     DEBUG_LEVEL = "Debug"
219
220     def __init__(self):
221         super(AccessConfiguration, self).__init__()
222         self.add_attribute(name = "mode",
223                 help = "Instance execution mode",
224                 type = Attribute.ENUM,
225                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
226                 allowed = [AccessConfiguration.MODE_DAEMON,
227                     AccessConfiguration.MODE_SINGLE_PROCESS],
228                 validation_function = validation.is_enum)
229         self.add_attribute(name = "communication",
230                 help = "Instance communication mode",
231                 type = Attribute.ENUM,
232                 value = AccessConfiguration.ACCESS_LOCAL,
233                 allowed = [AccessConfiguration.ACCESS_LOCAL,
234                     AccessConfiguration.ACCESS_SSH],
235                 validation_function = validation.is_enum)
236         self.add_attribute(name = "host",
237                 help = "Host where the testbed will be executed",
238                 type = Attribute.STRING,
239                 value = "localhost",
240                 validation_function = validation.is_string)
241         self.add_attribute(name = "user",
242                 help = "User on the Host to execute the testbed",
243                 type = Attribute.STRING,
244                 value = getpass.getuser(),
245                 validation_function = validation.is_string)
246         self.add_attribute(name = "port",
247                 help = "Port on the Host",
248                 type = Attribute.INTEGER,
249                 value = 22,
250                 validation_function = validation.is_integer)
251         self.add_attribute(name = "rootDirectory",
252                 help = "Root directory for storing process files",
253                 type = Attribute.STRING,
254                 value = ".",
255                 validation_function = validation.is_string) # TODO: validation.is_path
256         self.add_attribute(name = "useAgent",
257                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
258                 type = Attribute.BOOL,
259                 value = False,
260                 validation_function = validation.is_bool)
261         self.add_attribute(name = "logLevel",
262                 help = "Log level for instance",
263                 type = Attribute.ENUM,
264                 value = AccessConfiguration.ERROR_LEVEL,
265                 allowed = [AccessConfiguration.ERROR_LEVEL,
266                     AccessConfiguration.DEBUG_LEVEL],
267                 validation_function = validation.is_enum)
268         self.add_attribute(name = "recover",
269                 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
270                 type = Attribute.BOOL,
271                 value = False,
272                 validation_function = validation.is_bool)
273
274 class TempDir(object):
275     def __init__(self):
276         self.path = tempfile.mkdtemp()
277     
278     def __del__(self):
279         shutil.rmtree(self.path)
280
281 class PermDir(object):
282     def __init__(self, path):
283         self.path = path
284
285 def create_controller(xml, access_config = None):
286     mode = None if not access_config \
287             else access_config.get_attribute_value("mode")
288     launch = True if not access_config \
289             else not access_config.get_attribute_value("recover")
290     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
291         if not launch:
292             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
293         
294         from nepi.core.execute import ExperimentController
295         
296         if not access_config or not access_config.has_attribute("rootDirectory"):
297             root_dir = TempDir()
298         else:
299             root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
300         controller = ExperimentController(xml, root_dir.path)
301         
302         # inject reference to temporary dir, so that it gets cleaned
303         # up at destruction time.
304         controller._tempdir = root_dir
305         
306         return controller
307     elif mode == AccessConfiguration.MODE_DAEMON:
308         (root_dir, log_level, user, host, port, agent) = \
309                 get_access_config_params(access_config)
310         return ExperimentControllerProxy(root_dir, log_level,
311                 experiment_xml = xml, host = host, port = port, user = user, 
312                 agent = agent, launch = launch)
313     raise RuntimeError("Unsupported access configuration '%s'" % mode)
314
315 def create_testbed_controller(testbed_id, testbed_version, access_config):
316     mode = None if not access_config \
317             else access_config.get_attribute_value("mode")
318     launch = True if not access_config \
319             else not access_config.get_attribute_value("recover")
320     environment_setup = access_config \
321             and access_config.has_attribute(ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP) \
322             and access_config.get_attribute_value(ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP) \
323             or ""
324     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
325         if not launch:
326             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
327         return  _build_testbed_controller(testbed_id, testbed_version)
328     elif mode == AccessConfiguration.MODE_DAEMON:
329         (root_dir, log_level, user, host, port, agent) = \
330                 get_access_config_params(access_config)
331         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
332                 testbed_version = testbed_version, host = host, port = port,
333                 user = user, agent = agent, launch = launch,
334                 environment_setup = environment_setup)
335     raise RuntimeError("Unsupported access configuration '%s'" % mode)
336
337 def _build_testbed_controller(testbed_id, testbed_version):
338     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
339     if not mod_name in sys.modules:
340         __import__(mod_name)
341     module = sys.modules[mod_name]
342     return module.TestbedController(testbed_version)
343
344 class TestbedControllerServer(server.Server):
345     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
346         super(TestbedControllerServer, self).__init__(root_dir, log_level)
347         self._testbed_id = testbed_id
348         self._testbed_version = testbed_version
349         self._testbed = None
350
351     def post_daemonize(self):
352         self._testbed = _build_testbed_controller(self._testbed_id, 
353                 self._testbed_version)
354
355     def reply_action(self, msg):
356         if not msg:
357             result = base64.b64encode("Invalid command line")
358             reply = "%d|%s" % (ERROR, result)
359         else:
360             params = msg.split("|")
361             instruction = int(params[0])
362             log_msg(self, params)
363             try:
364                 if instruction == TRACE:
365                     reply = self.trace(params)
366                 elif instruction == START:
367                     reply = self.start(params)
368                 elif instruction == STOP:
369                     reply = self.stop(params)
370                 elif instruction == SHUTDOWN:
371                     reply = self.shutdown(params)
372                 elif instruction == CONFIGURE:
373                     reply = self.defer_configure(params)
374                 elif instruction == CREATE:
375                     reply = self.defer_create(params)
376                 elif instruction == CREATE_SET:
377                     reply = self.defer_create_set(params)
378                 elif instruction == FACTORY_SET:
379                     reply = self.defer_factory_set(params)
380                 elif instruction == CONNECT:
381                     reply = self.defer_connect(params)
382                 elif instruction == CROSS_CONNECT:
383                     reply = self.defer_cross_connect(params)
384                 elif instruction == ADD_TRACE:
385                     reply = self.defer_add_trace(params)
386                 elif instruction == ADD_ADDRESS:
387                     reply = self.defer_add_address(params)
388                 elif instruction == ADD_ROUTE:
389                     reply = self.defer_add_route(params)
390                 elif instruction == DO_SETUP:
391                     reply = self.do_setup(params)
392                 elif instruction == DO_CREATE:
393                     reply = self.do_create(params)
394                 elif instruction == DO_CONNECT_INIT:
395                     reply = self.do_connect_init(params)
396                 elif instruction == DO_CONNECT_COMPL:
397                     reply = self.do_connect_compl(params)
398                 elif instruction == DO_CONFIGURE:
399                     reply = self.do_configure(params)
400                 elif instruction == DO_PRECONFIGURE:
401                     reply = self.do_preconfigure(params)
402                 elif instruction == DO_CROSS_CONNECT_INIT:
403                     reply = self.do_cross_connect_init(params)
404                 elif instruction == DO_CROSS_CONNECT_COMPL:
405                     reply = self.do_cross_connect_compl(params)
406                 elif instruction == GET:
407                     reply = self.get(params)
408                 elif instruction == SET:
409                     reply = self.set(params)
410                 elif instruction == GET_ADDRESS:
411                     reply = self.get_address(params)
412                 elif instruction == GET_ROUTE:
413                     reply = self.get_route(params)
414                 elif instruction == ACTION:
415                     reply = self.action(params)
416                 elif instruction == STATUS:
417                     reply = self.status(params)
418                 elif instruction == GUIDS:
419                     reply = self.guids(params)
420                 elif instruction == GET_ATTRIBUTE_LIST:
421                     reply = self.get_attribute_list(params)
422                 elif instruction == TESTBED_ID:
423                     reply = self.testbed_id(params)
424                 elif instruction == TESTBED_VERSION:
425                     reply = self.testbed_version(params)
426                 else:
427                     error = "Invalid instruction %s" % instruction
428                     self.log_error(error)
429                     result = base64.b64encode(error)
430                     reply = "%d|%s" % (ERROR, result)
431             except:
432                 error = self.log_error()
433                 result = base64.b64encode(error)
434                 reply = "%d|%s" % (ERROR, result)
435         log_reply(self, reply)
436         return reply
437
438     def guids(self, params):
439         guids = self._testbed.guids
440         value = cPickle.dumps(guids)
441         result = base64.b64encode(value)
442         return "%d|%s" % (OK, result)
443
444     def testbed_id(self, params):
445         testbed_id = self._testbed.testbed_id
446         result = base64.b64encode(str(testbed_id))
447         return "%d|%s" % (OK, result)
448
449     def testbed_version(self, params):
450         testbed_version = self._testbed.testbed_version
451         result = base64.b64encode(str(testbed_version))
452         return "%d|%s" % (OK, result)
453
454     def defer_create(self, params):
455         guid = int(params[1])
456         factory_id = params[2]
457         self._testbed.defer_create(guid, factory_id)
458         return "%d|%s" % (OK, "")
459
460     def trace(self, params):
461         guid = int(params[1])
462         trace_id = params[2]
463         attribute = base64.b64decode(params[3])
464         trace = self._testbed.trace(guid, trace_id, attribute)
465         result = base64.b64encode(trace)
466         return "%d|%s" % (OK, result)
467
468     def start(self, params):
469         self._testbed.start()
470         return "%d|%s" % (OK, "")
471
472     def stop(self, params):
473         self._testbed.stop()
474         return "%d|%s" % (OK, "")
475
476     def shutdown(self, params):
477         self._testbed.shutdown()
478         return "%d|%s" % (OK, "")
479
480     def defer_configure(self, params):
481         name = base64.b64decode(params[1])
482         value = base64.b64decode(params[2])
483         type = int(params[3])
484         value = set_type(type, value)
485         self._testbed.defer_configure(name, value)
486         return "%d|%s" % (OK, "")
487
488     def defer_create_set(self, params):
489         guid = int(params[1])
490         name = base64.b64decode(params[2])
491         value = base64.b64decode(params[3])
492         type = int(params[4])
493         value = set_type(type, value)
494         self._testbed.defer_create_set(guid, name, value)
495         return "%d|%s" % (OK, "")
496
497     def defer_factory_set(self, params):
498         name = base64.b64decode(params[1])
499         value = base64.b64decode(params[2])
500         type = int(params[3])
501         value = set_type(type, value)
502         self._testbed.defer_factory_set(name, value)
503         return "%d|%s" % (OK, "")
504
505     def defer_connect(self, params):
506         guid1 = int(params[1])
507         connector_type_name1 = params[2]
508         guid2 = int(params[3])
509         connector_type_name2 = params[4]
510         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
511             connector_type_name2)
512         return "%d|%s" % (OK, "")
513
514     def defer_cross_connect(self, params):
515         guid = int(params[1])
516         connector_type_name = params[2]
517         cross_guid = int(params[3])
518         connector_type_name = params[4]
519         cross_guid = int(params[5])
520         cross_testbed_guid = int(params[6])
521         cross_testbed_id = params[7]
522         cross_factory_id = params[8]
523         cross_connector_type_name = params[9]
524         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
525             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
526             cross_connector_type_name)
527         return "%d|%s" % (OK, "")
528
529     def defer_add_trace(self, params):
530         guid = int(params[1])
531         trace_id = params[2]
532         self._testbed.defer_add_trace(guid, trace_id)
533         return "%d|%s" % (OK, "")
534
535     def defer_add_address(self, params):
536         guid = int(params[1])
537         address = params[2]
538         netprefix = int(params[3])
539         broadcast = params[4]
540         self._testbed.defer_add_address(guid, address, netprefix,
541                 broadcast)
542         return "%d|%s" % (OK, "")
543
544     def defer_add_route(self, params):
545         guid = int(params[1])
546         destination = params[2]
547         netprefix = int(params[3])
548         nexthop = params[4]
549         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
550         return "%d|%s" % (OK, "")
551
552     def do_setup(self, params):
553         self._testbed.do_setup()
554         return "%d|%s" % (OK, "")
555
556     def do_create(self, params):
557         self._testbed.do_create()
558         return "%d|%s" % (OK, "")
559
560     def do_connect_init(self, params):
561         self._testbed.do_connect_init()
562         return "%d|%s" % (OK, "")
563
564     def do_connect_compl(self, params):
565         self._testbed.do_connect_compl()
566         return "%d|%s" % (OK, "")
567
568     def do_configure(self, params):
569         self._testbed.do_configure()
570         return "%d|%s" % (OK, "")
571
572     def do_preconfigure(self, params):
573         self._testbed.do_preconfigure()
574         return "%d|%s" % (OK, "")
575
576     def do_cross_connect_init(self, params):
577         pcross_data = base64.b64decode(params[1])
578         cross_data = cPickle.loads(pcross_data)
579         self._testbed.do_cross_connect_init(cross_data)
580         return "%d|%s" % (OK, "")
581
582     def do_cross_connect_compl(self, params):
583         pcross_data = base64.b64decode(params[1])
584         cross_data = cPickle.loads(pcross_data)
585         self._testbed.do_cross_connect_compl(cross_data)
586         return "%d|%s" % (OK, "")
587
588     def get(self, params):
589         guid = int(param[1])
590         name = base64.b64decode(params[2])
591         value = self._testbed.get(guid, name, time)
592         time = params[3]
593         result = base64.b64encode(str(value))
594         return "%d|%s" % (OK, result)
595
596     def set(self, params):
597         guid = int(params[1])
598         name = base64.b64decode(params[2])
599         value = base64.b64decode(params[3])
600         type = int(params[2])
601         time = params[4]
602         value = set_type(type, value)
603         self._testbed.set(guid, name, value, time)
604         return "%d|%s" % (OK, "")
605
606     def get_address(self, params):
607         guid = int(params[1])
608         index = int(params[2])
609         attribute = base64.b64decode(param[3])
610         value = self._testbed.get_address(guid, index, attribute)
611         result = base64.b64encode(str(value))
612         return "%d|%s" % (OK, result)
613
614     def get_route(self, params):
615         guid = int(params[1])
616         index = int(params[2])
617         attribute = base64.b64decode(param[3])
618         value = self._testbed.get_route(guid, index, attribute)
619         result = base64.b64encode(str(value))
620         return "%d|%s" % (OK, result)
621
622     def action(self, params):
623         time = params[1]
624         guid = int(params[2])
625         command = base64.b64decode(params[3])
626         self._testbed.action(time, guid, command)
627         return "%d|%s" % (OK, "")
628
629     def status(self, params):
630         guid = None
631         if params[1] != "None":
632             guid = int(params[1])
633         status = self._testbed.status(guid)
634         result = base64.b64encode(str(status))
635         return "%d|%s" % (OK, result)
636
637     def get_attribute_list(self, params):
638         guid = int(params[1])
639         attr_list = self._testbed.get_attribute_list(guid)
640         value = cPickle.dumps(attr_list)
641         result = base64.b64encode(value)
642         return "%d|%s" % (OK, result)
643
644 class ExperimentControllerServer(server.Server):
645     def __init__(self, root_dir, log_level, experiment_xml):
646         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
647         self._experiment_xml = experiment_xml
648         self._controller = None
649
650     def post_daemonize(self):
651         from nepi.core.execute import ExperimentController
652         self._controller = ExperimentController(self._experiment_xml, 
653             root_dir = self._root_dir)
654
655     def reply_action(self, msg):
656         if not msg:
657             result = base64.b64encode("Invalid command line")
658             reply = "%d|%s" % (ERROR, result)
659         else:
660             params = msg.split("|")
661             instruction = int(params[0])
662             log_msg(self, params)
663             try:
664                 if instruction == XML:
665                     reply = self.experiment_xml(params)
666                 elif instruction == ACCESS:
667                     reply = self.set_access_configuration(params)
668                 elif instruction == TRACE:
669                     reply = self.trace(params)
670                 elif instruction == FINISHED:
671                     reply = self.is_finished(params)
672                 elif instruction == EXPERIMENT_GET:
673                     reply = self.get(params)
674                 elif instruction == EXPERIMENT_SET:
675                     reply = self.set(params)
676                 elif instruction == START:
677                     reply = self.start(params)
678                 elif instruction == STOP:
679                     reply = self.stop(params)
680                 elif instruction == RECOVER:
681                     reply = self.recover(params)
682                 elif instruction == SHUTDOWN:
683                     reply = self.shutdown(params)
684                 else:
685                     error = "Invalid instruction %s" % instruction
686                     self.log_error(error)
687                     result = base64.b64encode(error)
688                     reply = "%d|%s" % (ERROR, result)
689             except:
690                 error = self.log_error()
691                 result = base64.b64encode(error)
692                 reply = "%d|%s" % (ERROR, result)
693         log_reply(self, reply)
694         return reply
695
696     def experiment_xml(self, params):
697         xml = self._controller.experiment_xml
698         result = base64.b64encode(xml)
699         return "%d|%s" % (OK, result)
700
701     def set_access_configuration(self, params):
702         testbed_guid = int(params[1])
703         mode = params[2]
704         communication = params[3]
705         host = params[4]
706         user = params[5]
707         port = int(params[6])
708         root_dir = params[7]
709         use_agent = params[8] == "True"
710         log_level = params[9]
711         access_config = AccessConfiguration()
712         access_config.set_attribute_value("mode", mode)
713         access_config.set_attribute_value("communication", communication)
714         access_config.set_attribute_value("host", host)
715         access_config.set_attribute_value("user", user)
716         access_config.set_attribute_value("port", port)
717         access_config.set_attribute_value("rootDirectory", root_dir)
718         access_config.set_attribute_value("useAgent", use_agent)
719         access_config.set_attribute_value("logLevel", log_level)
720         self._controller.set_access_configuration(testbed_guid, 
721                 access_config)
722         return "%d|%s" % (OK, "")
723
724     def trace(self, params):
725         testbed_guid = int(params[1])
726         guid = int(params[2])
727         trace_id = params[3]
728         attribute = base64.b64decode(params[4])
729         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
730         result = base64.b64encode(trace)
731         return "%d|%s" % (OK, result)
732
733     def is_finished(self, params):
734         guid = int(params[1])
735         status = self._controller.is_finished(guid)
736         result = base64.b64encode(str(status))
737         return "%d|%s" % (OK, result)
738
739     def get(self, params):
740         testbed_guid = int(param[1])
741         guid = int(params[2])
742         name = base64.b64decode(params[3])
743         value = self._controller.get(testbed_guid, guid, name, time)
744         time = params[4]
745         result = base64.b64encode(str(value))
746         return "%d|%s" % (OK, result)
747
748     def set(self, params):
749         testbed_guid = int(params[1])
750         guid = int(params[2])
751         name = base64.b64decode(params[3])
752         value = base64.b64decode(params[4])
753         type = int(params[3])
754         time = params[5]
755         value = set_type(type, value)
756         self._controller.set(testbed_guid, guid, name, value, time)
757         return "%d|%s" % (OK, "")
758
759     def start(self, params):
760         self._controller.start()
761         return "%d|%s" % (OK, "")
762
763     def stop(self, params):
764         self._controller.stop()
765         return "%d|%s" % (OK, "")
766
767     def recover(self, params):
768         self._controller.recover()
769         return "%d|%s" % (OK, "")
770
771     def shutdown(self, params):
772         self._controller.shutdown()
773         return "%d|%s" % (OK, "")
774
775 class TestbedControllerProxy(object):
776     def __init__(self, root_dir, log_level, testbed_id = None, 
777             testbed_version = None, launch = True, host = None, 
778             port = None, user = None, agent = None,
779             environment_setup = ""):
780         if launch:
781             if testbed_id == None or testbed_version == None:
782                 raise RuntimeError("To launch a TesbedInstance server a \
783                         testbed_id and testbed_version are required")
784             # ssh
785             if host != None:
786                 python_code = "from nepi.util.proxy import \
787                         TesbedInstanceServer;\
788                         s = TestbedControllerServer('%s', %d, '%s', '%s');\
789                         s.run()" % (root_dir, log_level, testbed_id, 
790                                 testbed_version)
791                 proc = server.popen_ssh_subprocess(python_code, host = host,
792                     port = port, user = user, agent = agent,
793                     environment_setup = environment_setup)
794                 if proc.poll():
795                     err = proc.stderr.read()
796                     raise RuntimeError("Server could not be executed: %s" % \
797                             err)
798             else:
799                 # launch daemon
800                 s = TestbedControllerServer(root_dir, log_level, testbed_id, 
801                     testbed_version)
802                 s.run()
803
804         # connect client to server
805         self._client = server.Client(root_dir, host = host, port = port, 
806                 user = user, agent = agent)
807
808     @property
809     def guids(self):
810         msg = testbed_messages[GUIDS]
811         self._client.send_msg(msg)
812         reply = self._client.read_reply()
813         result = reply.split("|")
814         code = int(result[0])
815         text = base64.b64decode(result[1])
816         if code == ERROR:
817             raise RuntimeError(text)
818         guids = cPickle.loads(text)
819         return guids
820
821     @property
822     def testbed_id(self):
823         msg = testbed_messages[TESTBED_ID]
824         self._client.send_msg(msg)
825         reply = self._client.read_reply()
826         result = reply.split("|")
827         code = int(result[0])
828         text = base64.b64decode(result[1])
829         if code == ERROR:
830             raise RuntimeError(text)
831         return int(text)
832
833     @property
834     def testbed_version(self):
835         msg = testbed_messages[TESTBED_VERSION]
836         self._client.send_msg(msg)
837         reply = self._client.read_reply()
838         result = reply.split("|")
839         code = int(result[0])
840         text = base64.b64decode(result[1])
841         if code == ERROR:
842             raise RuntimeError(text)
843         return int(text)
844
845     def defer_configure(self, name, value):
846         msg = testbed_messages[CONFIGURE]
847         type = get_type(value)
848         # avoid having "|" in this parameters
849         name = base64.b64encode(name)
850         value = base64.b64encode(str(value))
851         msg = msg % (name, value, type)
852         self._client.send_msg(msg)
853         reply = self._client.read_reply()
854         result = reply.split("|")
855         code = int(result[0])
856         text = base64.b64decode(result[1])
857         if code == ERROR:
858             raise RuntimeError(text)
859
860     def defer_create(self, guid, factory_id):
861         msg = testbed_messages[CREATE]
862         msg = msg % (guid, factory_id)
863         self._client.send_msg(msg)
864         reply = self._client.read_reply()
865         result = reply.split("|")
866         code = int(result[0])
867         text = base64.b64decode(result[1])
868         if code == ERROR:
869             raise RuntimeError(text)
870
871     def defer_create_set(self, guid, name, value):
872         msg = testbed_messages[CREATE_SET]
873         type = get_type(value)
874         # avoid having "|" in this parameters
875         name = base64.b64encode(name)
876         value = base64.b64encode(str(value))
877         msg = msg % (guid, name, value, type)
878         self._client.send_msg(msg)
879         reply = self._client.read_reply()
880         result = reply.split("|")
881         code = int(result[0])
882         text = base64.b64decode(result[1])
883         if code == ERROR:
884             raise RuntimeError(text)
885
886     def defer_factory_set(self, guid, name, value):
887         msg = testbed_messages[FACTORY_SET]
888         type = get_type(value)
889         # avoid having "|" in this parameters
890         name = base64.b64encode(name)
891         value = base64.b64encode(str(value))
892         msg = msg % (guid, name, value, type)
893         self._client.send_msg(msg)
894         reply = self._client.read_reply()
895         result = reply.split("|")
896         code = int(result[0])
897         text = base64.b64decode(result[1])
898         if code == ERROR:
899             raise RuntimeError(text)
900
901     def defer_connect(self, guid1, connector_type_name1, guid2, 
902             connector_type_name2): 
903         msg = testbed_messages[CONNECT]
904         msg = msg % (guid1, connector_type_name1, guid2, 
905             connector_type_name2)
906         self._client.send_msg(msg)
907         reply = self._client.read_reply()
908         result = reply.split("|")
909         code = int(result[0])
910         text = base64.b64decode(result[1])
911         if code == ERROR:
912             raise RuntimeError(text)
913
914     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
915             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
916             cross_connector_type_name):
917         msg = testbed_messages[CROSS_CONNECT]
918         msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
919             cross_testbed_id, cross_factory_id, cross_connector_type_name)
920         self._client.send_msg(msg)
921         reply = self._client.read_reply()
922         result = reply.split("|")
923         code = int(result[0])
924         text = base64.b64decode(result[1])
925         if code == ERROR:
926             raise RuntimeError(text)
927
928     def defer_add_trace(self, guid, trace_id):
929         msg = testbed_messages[ADD_TRACE]
930         msg = msg % (guid, trace_id)
931         self._client.send_msg(msg)
932         reply = self._client.read_reply()
933         result = reply.split("|")
934         code = int(result[0])
935         text = base64.b64decode(result[1])
936         if code == ERROR:
937             raise RuntimeError(text)
938
939     def defer_add_address(self, guid, address, netprefix, broadcast): 
940         msg = testbed_messages[ADD_ADDRESS]
941         msg = msg % (guid, address, netprefix, broadcast)
942         self._client.send_msg(msg)
943         reply = self._client.read_reply()
944         result = reply.split("|")
945         code = int(result[0])
946         text = base64.b64decode(result[1])
947         if code == ERROR:
948             raise RuntimeError(text)
949
950     def defer_add_route(self, guid, destination, netprefix, nexthop):
951         msg = testbed_messages[ADD_ROUTE]
952         msg = msg % (guid, destination, netprefix, nexthop)
953         self._client.send_msg(msg)
954         reply = self._client.read_reply()
955         result = reply.split("|")
956         code = int(result[0])
957         text = base64.b64decode(result[1])
958         if code == ERROR:
959             raise RuntimeError(text)
960
961     def do_setup(self):
962         msg = testbed_messages[DO_SETUP]
963         self._client.send_msg(msg)
964         reply = self._client.read_reply()
965         result = reply.split("|")
966         code = int(result[0])
967         text = base64.b64decode(result[1])
968         if code == ERROR:
969             raise RuntimeError(text)
970
971     def do_create(self):
972         msg = testbed_messages[DO_CREATE]
973         self._client.send_msg(msg)
974         reply = self._client.read_reply()
975         result = reply.split("|")
976         code = int(result[0])
977         text = base64.b64decode(result[1])
978         if code == ERROR:
979             raise RuntimeError(text)
980
981     def do_connect_init(self):
982         msg = testbed_messages[DO_CONNECT_INIT]
983         self._client.send_msg(msg)
984         reply = self._client.read_reply()
985         result = reply.split("|")
986         code = int(result[0])
987         text = base64.b64decode(result[1])
988         if code == ERROR:
989             raise RuntimeError(text)
990
991     def do_connect_compl(self):
992         msg = testbed_messages[DO_CONNECT_COMPL]
993         self._client.send_msg(msg)
994         reply = self._client.read_reply()
995         result = reply.split("|")
996         code = int(result[0])
997         text = base64.b64decode(result[1])
998         if code == ERROR:
999             raise RuntimeError(text)
1000
1001     def do_configure(self):
1002         msg = testbed_messages[DO_CONFIGURE]
1003         self._client.send_msg(msg)
1004         reply = self._client.read_reply()
1005         result = reply.split("|")
1006         code = int(result[0])
1007         text = base64.b64decode(result[1])
1008         if code == ERROR:
1009             raise RuntimeError(text)
1010
1011     def do_preconfigure(self):
1012         msg = testbed_messages[DO_PRECONFIGURE]
1013         self._client.send_msg(msg)
1014         reply = self._client.read_reply()
1015         result = reply.split("|")
1016         code = int(result[0])
1017         text = base64.b64decode(result[1])
1018         if code == ERROR:
1019             raise RuntimeError(text)
1020
1021     def do_cross_connect_init(self, cross_data):
1022         msg = testbed_messages[DO_CROSS_CONNECT_INIT]
1023         pcross_data = cPickle.dumps(cross_data)
1024         cross_data = base64.b64encode(pcross_data)
1025         msg = msg % (cross_data)
1026         self._client.send_msg(msg)
1027         reply = self._client.read_reply()
1028         result = reply.split("|")
1029         code = int(result[0])
1030         text = base64.b64decode(result[1])
1031         if code == ERROR:
1032             raise RuntimeError(text)
1033
1034     def do_cross_connect_compl(self, cross_data):
1035         msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
1036         pcross_data = cPickle.dumps(cross_data)
1037         cross_data = base64.b64encode(pcross_data)
1038         msg = msg % (cross_data)
1039         self._client.send_msg(msg)
1040         reply = self._client.read_reply()
1041         result = reply.split("|")
1042         code = int(result[0])
1043         text = base64.b64decode(result[1])
1044         if code == ERROR:
1045             raise RuntimeError(text)
1046
1047     def start(self, time = TIME_NOW):
1048         msg = testbed_messages[START]
1049         self._client.send_msg(msg)
1050         reply = self._client.read_reply()
1051         result = reply.split("|")
1052         code = int(result[0])
1053         text = base64.b64decode(result[1])
1054         if code == ERROR:
1055             raise RuntimeError(text)
1056
1057     def stop(self, time = TIME_NOW):
1058         msg = testbed_messages[STOP]
1059         self._client.send_msg(msg)
1060         reply = self._client.read_reply()
1061         result = reply.split("|")
1062         code = int(result[0])
1063         text = base64.b64decode(result[1])
1064         if code == ERROR:
1065             raise RuntimeError(text)
1066
1067     def set(self, guid, name, value, time = TIME_NOW):
1068         msg = testbed_messages[SET]
1069         type = get_type(value)
1070         # avoid having "|" in this parameters
1071         name = base64.b64encode(name)
1072         value = base64.b64encode(str(value))
1073         msg = msg % (guid, name, value, type, time)
1074         self._client.send_msg(msg)
1075         reply = self._client.read_reply()
1076         result = reply.split("|")
1077         code = int(result[0])
1078         text = base64.b64decode(result[1])
1079         if code == ERROR:
1080             raise RuntimeError(text)
1081
1082     def get(self, guid, name, time = TIME_NOW):
1083         msg = testbed_messages[GET]
1084         # avoid having "|" in this parameters
1085         name = base64.b64encode(name)
1086         msg = msg % (guid, name, time)
1087         self._client.send_msg(msg)
1088         reply = self._client.read_reply()
1089         result = reply.split("|")
1090         code = int(result[0])
1091         text = base64.b64decode(result[1])
1092         if code == ERROR:
1093             raise RuntimeError(text)
1094         return text
1095
1096     def get_address(self, guid, index, attribute):
1097         msg = testbed_messages[GET_ADDRESS]
1098         # avoid having "|" in this parameters
1099         attribute = base64.b64encode(attribute)
1100         msg = msg % (guid, index, attribute)
1101         self._client.send_msg(msg)
1102         reply = self._client.read_reply()
1103         result = reply.split("|")
1104         code = int(result[0])
1105         text = base64.b64decode(result[1])
1106         if code == ERROR:
1107             raise RuntimeError(text)
1108         return text
1109
1110     def get_route(self, guid, index, attribute):
1111         msg = testbed_messages[GET_ROUTE]
1112         # avoid having "|" in this parameters
1113         attribute = base64.b64encode(attribute)
1114         msg = msg % (guid, index, attribute)
1115         self._client.send_msg(msg)
1116         reply = self._client.read_reply()
1117         result = reply.split("|")
1118         code = int(result[0])
1119         text = base64.b64decode(result[1])
1120         if code == ERROR:
1121             raise RuntimeError(text)
1122         return text
1123
1124     def action(self, time, guid, action):
1125         msg = testbed_messages[ACTION]
1126         msg = msg % (time, guid, action)
1127         self._client.send_msg(msg)
1128         reply = self._client.read_reply()
1129         result = reply.split("|")
1130         code = int(result[0])
1131         text = base64.b64decode(result[1])
1132         if code == ERROR:
1133             raise RuntimeError(text)
1134
1135     def status(self, guid = None):
1136         msg = testbed_messages[STATUS]
1137         msg = msg % str(guid)
1138         self._client.send_msg(msg)
1139         reply = self._client.read_reply()
1140         result = reply.split("|")
1141         code = int(result[0])
1142         text = base64.b64decode(result[1])
1143         if code == ERROR:
1144             raise RuntimeError(text)
1145         return int(text)
1146
1147     def trace(self, guid, trace_id, attribute='value'):
1148         msg = testbed_messages[TRACE]
1149         attribute = base64.b64encode(attribute)
1150         msg = msg % (guid, trace_id, attribute)
1151         self._client.send_msg(msg)
1152         reply = self._client.read_reply()
1153         result = reply.split("|")
1154         code = int(result[0])
1155         text = base64.b64decode(result[1])
1156         if code == ERROR:
1157             raise RuntimeError(text)
1158         return text
1159
1160     def get_attribute_list(self, guid):
1161         msg = testbed_messages[GET_ATTRIBUTE_LIST]
1162         msg = msg % (guid)
1163         self._client.send_msg(msg)
1164         reply = self._client.read_reply()
1165         result = reply.split("|")
1166         code = int(result[0])
1167         text = base64.b64decode(result[1])
1168         if code == ERROR:
1169             raise RuntimeError(text)
1170         attr_list = cPickle.loads(text)
1171         return attr_list
1172
1173     def shutdown(self):
1174         msg = testbed_messages[SHUTDOWN]
1175         self._client.send_msg(msg)
1176         reply = self._client.read_reply()
1177         result = reply.split("|")
1178         code = int(result[0])
1179         text = base64.b64decode(result[1])
1180         if code == ERROR:
1181             raise RuntimeError(text)
1182         self._client.send_stop()
1183         self._client.read_reply() # wait for it
1184
1185 class ExperimentControllerProxy(object):
1186     def __init__(self, root_dir, log_level, experiment_xml = None, 
1187             launch = True, host = None, port = None, user = None, 
1188             agent = None, environment_setup = ""):
1189         if launch:
1190             # launch server
1191             if experiment_xml == None:
1192                 raise RuntimeError("To launch a ExperimentControllerServer a \
1193                         xml description of the experiment is required")
1194             # ssh
1195             if host != None:
1196                 xml = experiment_xml
1197                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1198                         s = ExperimentControllerServer(%r, %r, %r);\
1199                         s.run()" % (root_dir, log_level, xml)
1200                 proc = server.popen_ssh_subprocess(python_code, host = host,
1201                     port = port, user = user, agent = agent,
1202                     environment_setup = environment_setup)
1203                 if proc.poll():
1204                     err = proc.stderr.read()
1205                     raise RuntimeError("Server could not be executed: %s" % \
1206                             err)
1207             else:
1208                 # launch daemon
1209                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1210                 s.run()
1211
1212         # connect client to server
1213         self._client = server.Client(root_dir, host = host, port = port, 
1214                 user = user, agent = agent)
1215
1216     @property
1217     def experiment_xml(self):
1218         msg = controller_messages[XML]
1219         self._client.send_msg(msg)
1220         reply = self._client.read_reply()
1221         result = reply.split("|")
1222         code = int(result[0])
1223         text = base64.b64decode(result[1])
1224         if code == ERROR:
1225             raise RuntimeError(text)
1226         return text
1227
1228     def set_access_configuration(self, testbed_guid, access_config):
1229         mode = access_config.get_attribute_value("mode")
1230         communication = access_config.get_attribute_value("communication")
1231         host = access_config.get_attribute_value("host")
1232         user = access_config.get_attribute_value("user")
1233         port = access_config.get_attribute_value("port")
1234         root_dir = access_config.get_attribute_value("rootDirectory")
1235         use_agent = access_config.get_attribute_value("useAgent")
1236         log_level = access_config.get_attribute_value("logLevel")
1237         msg = controller_messages[ACCESS]
1238         msg = msg % (testbed_guid, mode, communication, host, user, port, 
1239                 root_dir, use_agent, log_level)
1240         self._client.send_msg(msg)
1241         reply = self._client.read_reply()
1242         result = reply.split("|")
1243         code = int(result[0])
1244         text =  base64.b64decode(result[1])
1245         if code == ERROR:
1246             raise RuntimeError(text)
1247
1248     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1249         msg = controller_messages[TRACE]
1250         attribute = base64.b64encode(attribute)
1251         msg = msg % (testbed_guid, guid, trace_id, attribute)
1252         self._client.send_msg(msg)
1253         reply = self._client.read_reply()
1254         result = reply.split("|")
1255         code = int(result[0])
1256         text =  base64.b64decode(result[1])
1257         if code == OK:
1258             return text
1259         raise RuntimeError(text)
1260
1261     def start(self):
1262         msg = controller_messages[START]
1263         self._client.send_msg(msg)
1264         reply = self._client.read_reply()
1265         result = reply.split("|")
1266         code = int(result[0])
1267         text =  base64.b64decode(result[1])
1268         if code == ERROR:
1269             raise RuntimeError(text)
1270
1271     def stop(self):
1272         msg = controller_messages[STOP]
1273         self._client.send_msg(msg)
1274         reply = self._client.read_reply()
1275         result = reply.split("|")
1276         code = int(result[0])
1277         text =  base64.b64decode(result[1])
1278         if code == ERROR:
1279             raise RuntimeError(text)
1280
1281     def recover(self):
1282         msg = controller_messages[RECOVER]
1283         self._client.send_msg(msg)
1284         reply = self._client.read_reply()
1285         result = reply.split("|")
1286         code = int(result[0])
1287         text =  base64.b64decode(result[1])
1288         if code == ERROR:
1289             raise RuntimeError(text)
1290
1291     def is_finished(self, guid):
1292         msg = controller_messages[FINISHED]
1293         msg = msg % guid
1294         self._client.send_msg(msg)
1295         reply = self._client.read_reply()
1296         result = reply.split("|")
1297         code = int(result[0])
1298         text = base64.b64decode(result[1])
1299         if code == ERROR:
1300             raise RuntimeError(text)
1301         return text == "True"
1302
1303     def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
1304         msg = testbed_messages[EXPERIMENT_SET]
1305         type = get_type(value)
1306         # avoid having "|" in this parameters
1307         name = base64.b64encode(name)
1308         value = base64.b64encode(str(value))
1309         msg = msg % (testbed_guid, guid, name, value, type, time)
1310         self._client.send_msg(msg)
1311         reply = self._client.read_reply()
1312         result = reply.split("|")
1313         code = int(result[0])
1314         text = base64.b64decode(result[1])
1315         if code == ERROR:
1316             raise RuntimeError(text)
1317
1318     def get(self, testbed_guid, guid, name, time = TIME_NOW):
1319         msg = testbed_messages[EXPERIMENT_GET]
1320         # avoid having "|" in this parameters
1321         name = base64.b64encode(name)
1322         msg = msg % (testbed_guid, guid, name, time)
1323         self._client.send_msg(msg)
1324         reply = self._client.read_reply()
1325         result = reply.split("|")
1326         code = int(result[0])
1327         text = base64.b64decode(result[1])
1328         if code == ERROR:
1329             raise RuntimeError(text)
1330         return text
1331
1332     def shutdown(self):
1333         msg = controller_messages[SHUTDOWN]
1334         self._client.send_msg(msg)
1335         reply = self._client.read_reply()
1336         result = reply.split("|")
1337         code = int(result[0])
1338         text =  base64.b64decode(result[1])
1339         if code == ERROR:
1340             raise RuntimeError(text)
1341         self._client.send_stop()
1342         self._client.read_reply() # wait for it
1343