mock cross_connect test added to test/core/integration.py
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import cPickle
10 import sys
11 import time
12 import tempfile
13 import shutil
14
15 # PROTOCOL REPLIES
16 OK = 0
17 ERROR = 1
18
19 # PROTOCOL INSTRUCTION MESSAGES
20 XML = 2 
21 ACCESS  = 3
22 TRACE   = 4
23 FINISHED    = 5
24 START   = 6
25 STOP    = 7
26 SHUTDOWN    = 8
27 CONFIGURE   = 9
28 CREATE      = 10
29 CREATE_SET  = 11
30 FACTORY_SET = 12
31 CONNECT     = 13
32 CROSS_CONNECT   = 14
33 ADD_TRACE   = 15
34 ADD_ADDRESS = 16
35 ADD_ROUTE   = 17
36 DO_SETUP    = 18
37 DO_CREATE   = 19
38 DO_CONNECT_INIT = 20
39 DO_CONFIGURE    = 21
40 DO_CROSS_CONNECT_INIT   = 22
41 GET = 23
42 SET = 24
43 ACTION  = 25
44 STATUS  = 26
45 GUIDS  = 27
46 GET_ROUTE = 28
47 GET_ADDRESS = 29
48 RECOVER = 30
49 DO_PRECONFIGURE     = 31
50 GET_ATTRIBUTE_LIST  = 32
51 DO_CONNECT_COMPL    = 33
52 DO_CROSS_CONNECT_COMPL  = 34
53 TESTBED_ID  = 35
54 TESTBED_VERSION  = 36
55 EXPERIMENT_SET = 37
56 EXPERIMENT_GET = 38
57
58 # PARAMETER TYPE
59 STRING  =  100
60 INTEGER = 101
61 BOOL    = 102
62 FLOAT   = 103
63
64 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
65 controller_messages = dict({
66     XML:    "%d" % XML,
67     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
68     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
69     FINISHED:   "%d|%s" % (FINISHED, "%d"),
70     START:  "%d" % START,
71     STOP:   "%d" % STOP,
72     RECOVER : "%d" % RECOVER,
73     SHUTDOWN:   "%d" % SHUTDOWN,
74     })
75
76 # TESTBED INSTANCE PROTOCOL MESSAGES
77 testbed_messages = dict({
78     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
79     START:  "%d" % START,
80     STOP:   "%d" % STOP,
81     SHUTDOWN:   "%d" % SHUTDOWN,
82     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
83     CREATE: "%d|%s" % (CREATE, "%d|%s"),
84     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
85     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
86     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
87     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"),
88     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
89     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
90     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
91     DO_SETUP:   "%d" % DO_SETUP,
92     DO_CREATE:  "%d" % DO_CREATE,
93     DO_CONNECT_INIT:    "%d" % DO_CONNECT_INIT,
94     DO_CONNECT_COMPL:   "%d" % DO_CONNECT_COMPL,
95     DO_CONFIGURE:       "%d" % DO_CONFIGURE,
96     DO_PRECONFIGURE:    "%d" % DO_PRECONFIGURE,
97     DO_CROSS_CONNECT_INIT:  "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
98     DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
99     GET:    "%d|%s" % (GET, "%d|%s|%s"),
100     SET:    "%d|%s" % (SET, "%d|%s|%s|%d|%s"),
101     EXPERIMENT_GET:    "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"),
102     EXPERIMENT_SET:    "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"),
103     GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
104     GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
105     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
106     STATUS: "%d|%s" % (STATUS, "%s"),
107     GUIDS:  "%d" % GUIDS,
108     GET_ATTRIBUTE_LIST:  "%d" % GET_ATTRIBUTE_LIST,
109     TESTBED_ID:  "%d" % TESTBED_ID,
110     TESTBED_VERSION:  "%d" % TESTBED_VERSION,
111    })
112
113 instruction_text = dict({
114     OK:     "OK",
115     ERROR:  "ERROR",
116     XML:    "XML",
117     ACCESS: "ACCESS",
118     TRACE:  "TRACE",
119     FINISHED:   "FINISHED",
120     START:  "START",
121     STOP:   "STOP",
122     RECOVER: "RECOVER",
123     SHUTDOWN:   "SHUTDOWN",
124     CONFIGURE:  "CONFIGURE",
125     CREATE: "CREATE",
126     CREATE_SET: "CREATE_SET",
127     FACTORY_SET:    "FACTORY_SET",
128     CONNECT:    "CONNECT",
129     CROSS_CONNECT: "CROSS_CONNECT",
130     ADD_TRACE:  "ADD_TRACE",
131     ADD_ADDRESS:    "ADD_ADDRESS",
132     ADD_ROUTE:  "ADD_ROUTE",
133     DO_SETUP:   "DO_SETUP",
134     DO_CREATE:  "DO_CREATE",
135     DO_CONNECT_INIT: "DO_CONNECT_INIT",
136     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
137     DO_CONFIGURE:   "DO_CONFIGURE",
138     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
139     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
140     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
141     GET:    "GET",
142     SET:    "SET",
143     GET_ROUTE: "GET_ROUTE",
144     GET_ADDRESS: "GET_ADDRESS",
145     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
146     ACTION: "ACTION",
147     STATUS: "STATUS",
148     GUIDS:  "GUIDS",
149     STRING: "STRING",
150     INTEGER:    "INTEGER",
151     BOOL:   "BOOL",
152     FLOAT:  "FLOAT",
153     TESTBED_ID: "TESTBED_ID",
154     TESTBED_VERSION: "TESTBED_VERSION",
155     EXPERIMENT_SET: "EXPERIMENT_SET",
156     EXPERIMENT_GET: "EXPERIMENT_GET",
157     })
158
159 def get_type(value):
160     if isinstance(value, bool):
161         return BOOL
162     elif isinstance(value, int):
163         return INTEGER
164     elif isinstance(value, float):
165         return FLOAT
166     else:
167         return STRING
168
169 def set_type(type, value):
170     if type == INTEGER:
171         value = int(value)
172     elif type == FLOAT:
173         value = float(value)
174     elif type == BOOL:
175         value = value == "True"
176     else:
177         value = str(value)
178     return value
179
180 def log_msg(server, params):
181     instr = int(params[0])
182     instr_txt = instruction_text[instr]
183     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
184         instr_txt, ", ".join(map(str, params[1:]))))
185
186 def log_reply(server, reply):
187     res = reply.split("|")
188     code = int(res[0])
189     code_txt = instruction_text[code]
190     txt = base64.b64decode(res[1])
191     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
192             code_txt, txt))
193
194 def to_server_log_level(log_level):
195     return server.DEBUG_LEVEL \
196             if log_level == AccessConfiguration.DEBUG_LEVEL \
197                 else server.ERROR_LEVEL
198
199 def get_access_config_params(access_config):
200     root_dir = access_config.get_attribute_value("rootDirectory")
201     log_level = access_config.get_attribute_value("logLevel")
202     log_level = to_server_log_level(log_level)
203     user = host = port = agent = None
204     communication = access_config.get_attribute_value("communication")
205     if communication == AccessConfiguration.ACCESS_SSH:
206         user = access_config.get_attribute_value("user")
207         host = access_config.get_attribute_value("host")
208         port = access_config.get_attribute_value("port")
209         agent = access_config.get_attribute_value("useAgent")
210     return (root_dir, log_level, user, host, port, agent)
211
212 class AccessConfiguration(AttributesMap):
213     MODE_SINGLE_PROCESS = "SINGLE"
214     MODE_DAEMON = "DAEMON"
215     ACCESS_SSH = "SSH"
216     ACCESS_LOCAL = "LOCAL"
217     ERROR_LEVEL = "Error"
218     DEBUG_LEVEL = "Debug"
219
220     def __init__(self):
221         super(AccessConfiguration, self).__init__()
222         self.add_attribute(name = "mode",
223                 help = "Instance execution mode",
224                 type = Attribute.ENUM,
225                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
226                 allowed = [AccessConfiguration.MODE_DAEMON,
227                     AccessConfiguration.MODE_SINGLE_PROCESS],
228                 validation_function = validation.is_enum)
229         self.add_attribute(name = "communication",
230                 help = "Instance communication mode",
231                 type = Attribute.ENUM,
232                 value = AccessConfiguration.ACCESS_LOCAL,
233                 allowed = [AccessConfiguration.ACCESS_LOCAL,
234                     AccessConfiguration.ACCESS_SSH],
235                 validation_function = validation.is_enum)
236         self.add_attribute(name = "host",
237                 help = "Host where the testbed will be executed",
238                 type = Attribute.STRING,
239                 value = "localhost",
240                 validation_function = validation.is_string)
241         self.add_attribute(name = "user",
242                 help = "User on the Host to execute the testbed",
243                 type = Attribute.STRING,
244                 value = getpass.getuser(),
245                 validation_function = validation.is_string)
246         self.add_attribute(name = "port",
247                 help = "Port on the Host",
248                 type = Attribute.INTEGER,
249                 value = 22,
250                 validation_function = validation.is_integer)
251         self.add_attribute(name = "rootDirectory",
252                 help = "Root directory for storing process files",
253                 type = Attribute.STRING,
254                 value = ".",
255                 validation_function = validation.is_string) # TODO: validation.is_path
256         self.add_attribute(name = "useAgent",
257                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
258                 type = Attribute.BOOL,
259                 value = False,
260                 validation_function = validation.is_bool)
261         self.add_attribute(name = "logLevel",
262                 help = "Log level for instance",
263                 type = Attribute.ENUM,
264                 value = AccessConfiguration.ERROR_LEVEL,
265                 allowed = [AccessConfiguration.ERROR_LEVEL,
266                     AccessConfiguration.DEBUG_LEVEL],
267                 validation_function = validation.is_enum)
268         self.add_attribute(name = "recover",
269                 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
270                 type = Attribute.BOOL,
271                 value = False,
272                 validation_function = validation.is_bool)
273
274 class TempDir(object):
275     def __init__(self):
276         self.path = tempfile.mkdtemp()
277     
278     def __del__(self):
279         shutil.rmtree(self.path)
280
281 class PermDir(object):
282     def __init__(self, path):
283         self.path = path
284
285 def create_controller(xml, access_config = None):
286     mode = None if not access_config \
287             else access_config.get_attribute_value("mode")
288     launch = True if not access_config \
289             else not access_config.get_attribute_value("recover")
290     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
291         if not launch:
292             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
293         
294         from nepi.core.execute import ExperimentController
295         
296         if not access_config or not access_config.has_attribute("rootDirectory"):
297             root_dir = TempDir()
298         else:
299             root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
300         controller = ExperimentController(xml, root_dir.path)
301         
302         # inject reference to temporary dir, so that it gets cleaned
303         # up at destruction time.
304         controller._tempdir = root_dir
305         
306         return controller
307     elif mode == AccessConfiguration.MODE_DAEMON:
308         (root_dir, log_level, user, host, port, agent) = \
309                 get_access_config_params(access_config)
310         return ExperimentControllerProxy(root_dir, log_level,
311                 experiment_xml = xml, host = host, port = port, user = user, 
312                 agent = agent, launch = launch)
313     raise RuntimeError("Unsupported access configuration '%s'" % mode)
314
315 def create_testbed_controller(testbed_id, testbed_version, access_config):
316     mode = None if not access_config \
317             else access_config.get_attribute_value("mode")
318     launch = True if not access_config \
319             else not access_config.get_attribute_value("recover")
320     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
321         if not launch:
322             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
323         return  _build_testbed_controller(testbed_id, testbed_version)
324     elif mode == AccessConfiguration.MODE_DAEMON:
325         (root_dir, log_level, user, host, port, agent) = \
326                 get_access_config_params(access_config)
327         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
328                 testbed_version = testbed_version, host = host, port = port,
329                 user = user, agent = agent, launch = launch)
330     raise RuntimeError("Unsupported access configuration '%s'" % mode)
331
332 def _build_testbed_controller(testbed_id, testbed_version):
333     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
334     if not mod_name in sys.modules:
335         __import__(mod_name)
336     module = sys.modules[mod_name]
337     return module.TestbedController(testbed_version)
338
339 class TestbedControllerServer(server.Server):
340     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
341         super(TestbedControllerServer, self).__init__(root_dir, log_level)
342         self._testbed_id = testbed_id
343         self._testbed_version = testbed_version
344         self._testbed = None
345
346     def post_daemonize(self):
347         self._testbed = _build_testbed_controller(self._testbed_id, 
348                 self._testbed_version)
349
350     def reply_action(self, msg):
351         if not msg:
352             result = base64.b64encode("Invalid command line")
353             reply = "%d|%s" % (ERROR, result)
354         else:
355             params = msg.split("|")
356             instruction = int(params[0])
357             log_msg(self, params)
358             try:
359                 if instruction == TRACE:
360                     reply = self.trace(params)
361                 elif instruction == START:
362                     reply = self.start(params)
363                 elif instruction == STOP:
364                     reply = self.stop(params)
365                 elif instruction == SHUTDOWN:
366                     reply = self.shutdown(params)
367                 elif instruction == CONFIGURE:
368                     reply = self.defer_configure(params)
369                 elif instruction == CREATE:
370                     reply = self.defer_create(params)
371                 elif instruction == CREATE_SET:
372                     reply = self.defer_create_set(params)
373                 elif instruction == FACTORY_SET:
374                     reply = self.defer_factory_set(params)
375                 elif instruction == CONNECT:
376                     reply = self.defer_connect(params)
377                 elif instruction == CROSS_CONNECT:
378                     reply = self.defer_cross_connect(params)
379                 elif instruction == ADD_TRACE:
380                     reply = self.defer_add_trace(params)
381                 elif instruction == ADD_ADDRESS:
382                     reply = self.defer_add_address(params)
383                 elif instruction == ADD_ROUTE:
384                     reply = self.defer_add_route(params)
385                 elif instruction == DO_SETUP:
386                     reply = self.do_setup(params)
387                 elif instruction == DO_CREATE:
388                     reply = self.do_create(params)
389                 elif instruction == DO_CONNECT_INIT:
390                     reply = self.do_connect_init(params)
391                 elif instruction == DO_CONNECT_COMPL:
392                     reply = self.do_connect_compl(params)
393                 elif instruction == DO_CONFIGURE:
394                     reply = self.do_configure(params)
395                 elif instruction == DO_PRECONFIGURE:
396                     reply = self.do_preconfigure(params)
397                 elif instruction == DO_CROSS_CONNECT_INIT:
398                     reply = self.do_cross_connect_init(params)
399                 elif instruction == DO_CROSS_CONNECT_COMPL:
400                     reply = self.do_cross_connect_compl(params)
401                 elif instruction == GET:
402                     reply = self.get(params)
403                 elif instruction == SET:
404                     reply = self.set(params)
405                 elif instruction == GET_ADDRESS:
406                     reply = self.get_address(params)
407                 elif instruction == GET_ROUTE:
408                     reply = self.get_route(params)
409                 elif instruction == ACTION:
410                     reply = self.action(params)
411                 elif instruction == STATUS:
412                     reply = self.status(params)
413                 elif instruction == GUIDS:
414                     reply = self.guids(params)
415                 elif instruction == GET_ATTRIBUTE_LIST:
416                     reply = self.get_attribute_list(params)
417                 elif instruction == TESTBED_ID:
418                     reply = self.testbed_id(params)
419                 elif instruction == TESTBED_VERSION:
420                     reply = self.testbed_version(params)
421                 else:
422                     error = "Invalid instruction %s" % instruction
423                     self.log_error(error)
424                     result = base64.b64encode(error)
425                     reply = "%d|%s" % (ERROR, result)
426             except:
427                 error = self.log_error()
428                 result = base64.b64encode(error)
429                 reply = "%d|%s" % (ERROR, result)
430         log_reply(self, reply)
431         return reply
432
433     def guids(self, params):
434         guids = self._testbed.guids
435         value = cPickle.dumps(guids)
436         result = base64.b64encode(value)
437         return "%d|%s" % (OK, result)
438
439     def testbed_id(self, params):
440         testbed_id = self._testbed.testbed_id
441         result = base64.b64encode(str(testbed_id))
442         return "%d|%s" % (OK, result)
443
444     def testbed_version(self, params):
445         testbed_version = self._testbed.testbed_version
446         result = base64.b64encode(str(testbed_version))
447         return "%d|%s" % (OK, result)
448
449     def defer_create(self, params):
450         guid = int(params[1])
451         factory_id = params[2]
452         self._testbed.defer_create(guid, factory_id)
453         return "%d|%s" % (OK, "")
454
455     def trace(self, params):
456         guid = int(params[1])
457         trace_id = params[2]
458         attribute = base64.b64decode(params[3])
459         trace = self._testbed.trace(guid, trace_id, attribute)
460         result = base64.b64encode(trace)
461         return "%d|%s" % (OK, result)
462
463     def start(self, params):
464         self._testbed.start()
465         return "%d|%s" % (OK, "")
466
467     def stop(self, params):
468         self._testbed.stop()
469         return "%d|%s" % (OK, "")
470
471     def shutdown(self, params):
472         self._testbed.shutdown()
473         return "%d|%s" % (OK, "")
474
475     def defer_configure(self, params):
476         name = base64.b64decode(params[1])
477         value = base64.b64decode(params[2])
478         type = int(params[3])
479         value = set_type(type, value)
480         self._testbed.defer_configure(name, value)
481         return "%d|%s" % (OK, "")
482
483     def defer_create_set(self, params):
484         guid = int(params[1])
485         name = base64.b64decode(params[2])
486         value = base64.b64decode(params[3])
487         type = int(params[4])
488         value = set_type(type, value)
489         self._testbed.defer_create_set(guid, name, value)
490         return "%d|%s" % (OK, "")
491
492     def defer_factory_set(self, params):
493         name = base64.b64decode(params[1])
494         value = base64.b64decode(params[2])
495         type = int(params[3])
496         value = set_type(type, value)
497         self._testbed.defer_factory_set(name, value)
498         return "%d|%s" % (OK, "")
499
500     def defer_connect(self, params):
501         guid1 = int(params[1])
502         connector_type_name1 = params[2]
503         guid2 = int(params[3])
504         connector_type_name2 = params[4]
505         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
506             connector_type_name2)
507         return "%d|%s" % (OK, "")
508
509     def defer_cross_connect(self, params):
510         guid = int(params[1])
511         connector_type_name = params[2]
512         cross_guid = int(params[3])
513         connector_type_name = params[4]
514         cross_guid = int(params[5])
515         cross_testbed_guid = int(params[6])
516         cross_testbed_id = params[7]
517         cross_factory_id = params[8]
518         cross_connector_type_name = params[9]
519         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
520             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
521             cross_connector_type_name)
522         return "%d|%s" % (OK, "")
523
524     def defer_add_trace(self, params):
525         guid = int(params[1])
526         trace_id = params[2]
527         self._testbed.defer_add_trace(guid, trace_id)
528         return "%d|%s" % (OK, "")
529
530     def defer_add_address(self, params):
531         guid = int(params[1])
532         address = params[2]
533         netprefix = int(params[3])
534         broadcast = params[4]
535         self._testbed.defer_add_address(guid, address, netprefix,
536                 broadcast)
537         return "%d|%s" % (OK, "")
538
539     def defer_add_route(self, params):
540         guid = int(params[1])
541         destination = params[2]
542         netprefix = int(params[3])
543         nexthop = params[4]
544         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
545         return "%d|%s" % (OK, "")
546
547     def do_setup(self, params):
548         self._testbed.do_setup()
549         return "%d|%s" % (OK, "")
550
551     def do_create(self, params):
552         self._testbed.do_create()
553         return "%d|%s" % (OK, "")
554
555     def do_connect_init(self, params):
556         self._testbed.do_connect_init()
557         return "%d|%s" % (OK, "")
558
559     def do_connect_compl(self, params):
560         self._testbed.do_connect_compl()
561         return "%d|%s" % (OK, "")
562
563     def do_configure(self, params):
564         self._testbed.do_configure()
565         return "%d|%s" % (OK, "")
566
567     def do_preconfigure(self, params):
568         self._testbed.do_preconfigure()
569         return "%d|%s" % (OK, "")
570
571     def do_cross_connect_init(self, params):
572         pcross_data = base64.b64decode(params[1])
573         cross_data = cPickle.loads(pcross_data)
574         self._testbed.do_cross_connect_init(cross_data)
575         return "%d|%s" % (OK, "")
576
577     def do_cross_connect_compl(self, params):
578         pcross_data = base64.b64decode(params[1])
579         cross_data = cPickle.loads(pcross_data)
580         self._testbed.do_cross_connect_compl(cross_data)
581         return "%d|%s" % (OK, "")
582
583     def get(self, params):
584         guid = int(param[1])
585         name = base64.b64decode(params[2])
586         value = self._testbed.get(guid, name, time)
587         time = params[3]
588         result = base64.b64encode(str(value))
589         return "%d|%s" % (OK, result)
590
591     def set(self, params):
592         guid = int(params[1])
593         name = base64.b64decode(params[2])
594         value = base64.b64decode(params[3])
595         type = int(params[2])
596         time = params[4]
597         value = set_type(type, value)
598         self._testbed.set(guid, name, value, time)
599         return "%d|%s" % (OK, "")
600
601     def get_address(self, params):
602         guid = int(params[1])
603         index = int(params[2])
604         attribute = base64.b64decode(param[3])
605         value = self._testbed.get_address(guid, index, attribute)
606         result = base64.b64encode(str(value))
607         return "%d|%s" % (OK, result)
608
609     def get_route(self, params):
610         guid = int(params[1])
611         index = int(params[2])
612         attribute = base64.b64decode(param[3])
613         value = self._testbed.get_route(guid, index, attribute)
614         result = base64.b64encode(str(value))
615         return "%d|%s" % (OK, result)
616
617     def action(self, params):
618         time = params[1]
619         guid = int(params[2])
620         command = base64.b64decode(params[3])
621         self._testbed.action(time, guid, command)
622         return "%d|%s" % (OK, "")
623
624     def status(self, params):
625         guid = None
626         if params[1] != "None":
627             guid = int(params[1])
628         status = self._testbed.status(guid)
629         result = base64.b64encode(str(status))
630         return "%d|%s" % (OK, result)
631
632     def get_attribute_list(self, params):
633         guid = int(params[1])
634         attr_list = self._testbed.get_attribute_list(guid)
635         value = cPickle.dumps(attr_list)
636         result = base64.b64encode(value)
637         return "%d|%s" % (OK, result)
638
639 class ExperimentControllerServer(server.Server):
640     def __init__(self, root_dir, log_level, experiment_xml):
641         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
642         self._experiment_xml = experiment_xml
643         self._controller = None
644
645     def post_daemonize(self):
646         from nepi.core.execute import ExperimentController
647         self._controller = ExperimentController(self._experiment_xml, 
648             root_dir = self._root_dir)
649
650     def reply_action(self, msg):
651         if not msg:
652             result = base64.b64encode("Invalid command line")
653             reply = "%d|%s" % (ERROR, result)
654         else:
655             params = msg.split("|")
656             instruction = int(params[0])
657             log_msg(self, params)
658             try:
659                 if instruction == XML:
660                     reply = self.experiment_xml(params)
661                 elif instruction == ACCESS:
662                     reply = self.set_access_configuration(params)
663                 elif instruction == TRACE:
664                     reply = self.trace(params)
665                 elif instruction == FINISHED:
666                     reply = self.is_finished(params)
667                 elif instruction == EXPERIMENT_GET:
668                     reply = self.get(params)
669                 elif instruction == EXPERIMENT_SET:
670                     reply = self.set(params)
671                 elif instruction == START:
672                     reply = self.start(params)
673                 elif instruction == STOP:
674                     reply = self.stop(params)
675                 elif instruction == RECOVER:
676                     reply = self.recover(params)
677                 elif instruction == SHUTDOWN:
678                     reply = self.shutdown(params)
679                 else:
680                     error = "Invalid instruction %s" % instruction
681                     self.log_error(error)
682                     result = base64.b64encode(error)
683                     reply = "%d|%s" % (ERROR, result)
684             except:
685                 error = self.log_error()
686                 result = base64.b64encode(error)
687                 reply = "%d|%s" % (ERROR, result)
688         log_reply(self, reply)
689         return reply
690
691     def experiment_xml(self, params):
692         xml = self._controller.experiment_xml
693         result = base64.b64encode(xml)
694         return "%d|%s" % (OK, result)
695
696     def set_access_configuration(self, params):
697         testbed_guid = int(params[1])
698         mode = params[2]
699         communication = params[3]
700         host = params[4]
701         user = params[5]
702         port = int(params[6])
703         root_dir = params[7]
704         use_agent = params[8] == "True"
705         log_level = params[9]
706         access_config = AccessConfiguration()
707         access_config.set_attribute_value("mode", mode)
708         access_config.set_attribute_value("communication", communication)
709         access_config.set_attribute_value("host", host)
710         access_config.set_attribute_value("user", user)
711         access_config.set_attribute_value("port", port)
712         access_config.set_attribute_value("rootDirectory", root_dir)
713         access_config.set_attribute_value("useAgent", use_agent)
714         access_config.set_attribute_value("logLevel", log_level)
715         self._controller.set_access_configuration(testbed_guid, 
716                 access_config)
717         return "%d|%s" % (OK, "")
718
719     def trace(self, params):
720         testbed_guid = int(params[1])
721         guid = int(params[2])
722         trace_id = params[3]
723         attribute = base64.b64decode(params[4])
724         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
725         result = base64.b64encode(trace)
726         return "%d|%s" % (OK, result)
727
728     def is_finished(self, params):
729         guid = int(params[1])
730         status = self._controller.is_finished(guid)
731         result = base64.b64encode(str(status))
732         return "%d|%s" % (OK, result)
733
734     def get(self, params):
735         testbed_guid = int(param[1])
736         guid = int(params[2])
737         name = base64.b64decode(params[3])
738         value = self._controller.get(testbed_guid, guid, name, time)
739         time = params[4]
740         result = base64.b64encode(str(value))
741         return "%d|%s" % (OK, result)
742
743     def set(self, params):
744         testbed_guid = int(params[1])
745         guid = int(params[2])
746         name = base64.b64decode(params[3])
747         value = base64.b64decode(params[4])
748         type = int(params[3])
749         time = params[5]
750         value = set_type(type, value)
751         self._controller.set(testbed_guid, guid, name, value, time)
752         return "%d|%s" % (OK, "")
753
754     def start(self, params):
755         self._controller.start()
756         return "%d|%s" % (OK, "")
757
758     def stop(self, params):
759         self._controller.stop()
760         return "%d|%s" % (OK, "")
761
762     def recover(self, params):
763         self._controller.recover()
764         return "%d|%s" % (OK, "")
765
766     def shutdown(self, params):
767         self._controller.shutdown()
768         return "%d|%s" % (OK, "")
769
770 class TestbedControllerProxy(object):
771     def __init__(self, root_dir, log_level, testbed_id = None, 
772             testbed_version = None, launch = True, host = None, 
773             port = None, user = None, agent = None):
774         if launch:
775             if testbed_id == None or testbed_version == None:
776                 raise RuntimeError("To launch a TesbedInstance server a \
777                         testbed_id and testbed_version are required")
778             # ssh
779             if host != None:
780                 python_code = "from nepi.util.proxy import \
781                         TesbedInstanceServer;\
782                         s = TestbedControllerServer('%s', %d, '%s', '%s');\
783                         s.run()" % (root_dir, log_level, testbed_id, 
784                                 testbed_version)
785                 proc = server.popen_ssh_subprocess(python_code, host = host,
786                     port = port, user = user, agent = agent)
787                 if proc.poll():
788                     err = proc.stderr.read()
789                     raise RuntimeError("Server could not be executed: %s" % \
790                             err)
791             else:
792                 # launch daemon
793                 s = TestbedControllerServer(root_dir, log_level, testbed_id, 
794                     testbed_version)
795                 s.run()
796
797         # connect client to server
798         self._client = server.Client(root_dir, host = host, port = port, 
799                 user = user, agent = agent)
800
801     @property
802     def guids(self):
803         msg = testbed_messages[GUIDS]
804         self._client.send_msg(msg)
805         reply = self._client.read_reply()
806         result = reply.split("|")
807         code = int(result[0])
808         text = base64.b64decode(result[1])
809         if code == ERROR:
810             raise RuntimeError(text)
811         guids = cPickle.loads(text)
812         return guids
813
814     @property
815     def testbed_id(self):
816         msg = testbed_messages[TESTBED_ID]
817         self._client.send_msg(msg)
818         reply = self._client.read_reply()
819         result = reply.split("|")
820         code = int(result[0])
821         text = base64.b64decode(result[1])
822         if code == ERROR:
823             raise RuntimeError(text)
824         return int(text)
825
826     @property
827     def testbed_version(self):
828         msg = testbed_messages[TESTBED_VERSION]
829         self._client.send_msg(msg)
830         reply = self._client.read_reply()
831         result = reply.split("|")
832         code = int(result[0])
833         text = base64.b64decode(result[1])
834         if code == ERROR:
835             raise RuntimeError(text)
836         return int(text)
837
838     def defer_configure(self, name, value):
839         msg = testbed_messages[CONFIGURE]
840         type = get_type(value)
841         # avoid having "|" in this parameters
842         name = base64.b64encode(name)
843         value = base64.b64encode(str(value))
844         msg = msg % (name, value, type)
845         self._client.send_msg(msg)
846         reply = self._client.read_reply()
847         result = reply.split("|")
848         code = int(result[0])
849         text = base64.b64decode(result[1])
850         if code == ERROR:
851             raise RuntimeError(text)
852
853     def defer_create(self, guid, factory_id):
854         msg = testbed_messages[CREATE]
855         msg = msg % (guid, factory_id)
856         self._client.send_msg(msg)
857         reply = self._client.read_reply()
858         result = reply.split("|")
859         code = int(result[0])
860         text = base64.b64decode(result[1])
861         if code == ERROR:
862             raise RuntimeError(text)
863
864     def defer_create_set(self, guid, name, value):
865         msg = testbed_messages[CREATE_SET]
866         type = get_type(value)
867         # avoid having "|" in this parameters
868         name = base64.b64encode(name)
869         value = base64.b64encode(str(value))
870         msg = msg % (guid, name, value, type)
871         self._client.send_msg(msg)
872         reply = self._client.read_reply()
873         result = reply.split("|")
874         code = int(result[0])
875         text = base64.b64decode(result[1])
876         if code == ERROR:
877             raise RuntimeError(text)
878
879     def defer_factory_set(self, guid, name, value):
880         msg = testbed_messages[FACTORY_SET]
881         type = get_type(value)
882         # avoid having "|" in this parameters
883         name = base64.b64encode(name)
884         value = base64.b64encode(str(value))
885         msg = msg % (guid, name, value, type)
886         self._client.send_msg(msg)
887         reply = self._client.read_reply()
888         result = reply.split("|")
889         code = int(result[0])
890         text = base64.b64decode(result[1])
891         if code == ERROR:
892             raise RuntimeError(text)
893
894     def defer_connect(self, guid1, connector_type_name1, guid2, 
895             connector_type_name2): 
896         msg = testbed_messages[CONNECT]
897         msg = msg % (guid1, connector_type_name1, guid2, 
898             connector_type_name2)
899         self._client.send_msg(msg)
900         reply = self._client.read_reply()
901         result = reply.split("|")
902         code = int(result[0])
903         text = base64.b64decode(result[1])
904         if code == ERROR:
905             raise RuntimeError(text)
906
907     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
908             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
909             cross_connector_type_name):
910         msg = testbed_messages[CROSS_CONNECT]
911         msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
912             cross_testbed_id, cross_factory_id, cross_connector_type_name)
913         self._client.send_msg(msg)
914         reply = self._client.read_reply()
915         result = reply.split("|")
916         code = int(result[0])
917         text = base64.b64decode(result[1])
918         if code == ERROR:
919             raise RuntimeError(text)
920
921     def defer_add_trace(self, guid, trace_id):
922         msg = testbed_messages[ADD_TRACE]
923         msg = msg % (guid, trace_id)
924         self._client.send_msg(msg)
925         reply = self._client.read_reply()
926         result = reply.split("|")
927         code = int(result[0])
928         text = base64.b64decode(result[1])
929         if code == ERROR:
930             raise RuntimeError(text)
931
932     def defer_add_address(self, guid, address, netprefix, broadcast): 
933         msg = testbed_messages[ADD_ADDRESS]
934         msg = msg % (guid, address, netprefix, broadcast)
935         self._client.send_msg(msg)
936         reply = self._client.read_reply()
937         result = reply.split("|")
938         code = int(result[0])
939         text = base64.b64decode(result[1])
940         if code == ERROR:
941             raise RuntimeError(text)
942
943     def defer_add_route(self, guid, destination, netprefix, nexthop):
944         msg = testbed_messages[ADD_ROUTE]
945         msg = msg % (guid, destination, netprefix, nexthop)
946         self._client.send_msg(msg)
947         reply = self._client.read_reply()
948         result = reply.split("|")
949         code = int(result[0])
950         text = base64.b64decode(result[1])
951         if code == ERROR:
952             raise RuntimeError(text)
953
954     def do_setup(self):
955         msg = testbed_messages[DO_SETUP]
956         self._client.send_msg(msg)
957         reply = self._client.read_reply()
958         result = reply.split("|")
959         code = int(result[0])
960         text = base64.b64decode(result[1])
961         if code == ERROR:
962             raise RuntimeError(text)
963
964     def do_create(self):
965         msg = testbed_messages[DO_CREATE]
966         self._client.send_msg(msg)
967         reply = self._client.read_reply()
968         result = reply.split("|")
969         code = int(result[0])
970         text = base64.b64decode(result[1])
971         if code == ERROR:
972             raise RuntimeError(text)
973
974     def do_connect_init(self):
975         msg = testbed_messages[DO_CONNECT_INIT]
976         self._client.send_msg(msg)
977         reply = self._client.read_reply()
978         result = reply.split("|")
979         code = int(result[0])
980         text = base64.b64decode(result[1])
981         if code == ERROR:
982             raise RuntimeError(text)
983
984     def do_connect_compl(self):
985         msg = testbed_messages[DO_CONNECT_COMPL]
986         self._client.send_msg(msg)
987         reply = self._client.read_reply()
988         result = reply.split("|")
989         code = int(result[0])
990         text = base64.b64decode(result[1])
991         if code == ERROR:
992             raise RuntimeError(text)
993
994     def do_configure(self):
995         msg = testbed_messages[DO_CONFIGURE]
996         self._client.send_msg(msg)
997         reply = self._client.read_reply()
998         result = reply.split("|")
999         code = int(result[0])
1000         text = base64.b64decode(result[1])
1001         if code == ERROR:
1002             raise RuntimeError(text)
1003
1004     def do_preconfigure(self):
1005         msg = testbed_messages[DO_PRECONFIGURE]
1006         self._client.send_msg(msg)
1007         reply = self._client.read_reply()
1008         result = reply.split("|")
1009         code = int(result[0])
1010         text = base64.b64decode(result[1])
1011         if code == ERROR:
1012             raise RuntimeError(text)
1013
1014     def do_cross_connect_init(self, cross_data):
1015         msg = testbed_messages[DO_CROSS_CONNECT_INIT]
1016         pcross_data = cPickle.dumps(cross_data)
1017         cross_data = base64.b64encode(pcross_data)
1018         msg = msg % (cross_data)
1019         self._client.send_msg(msg)
1020         reply = self._client.read_reply()
1021         result = reply.split("|")
1022         code = int(result[0])
1023         text = base64.b64decode(result[1])
1024         if code == ERROR:
1025             raise RuntimeError(text)
1026
1027     def do_cross_connect_compl(self, cross_data):
1028         msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
1029         pcross_data = cPickle.dumps(cross_data)
1030         cross_data = base64.b64encode(pcross_data)
1031         msg = msg % (cross_data)
1032         self._client.send_msg(msg)
1033         reply = self._client.read_reply()
1034         result = reply.split("|")
1035         code = int(result[0])
1036         text = base64.b64decode(result[1])
1037         if code == ERROR:
1038             raise RuntimeError(text)
1039
1040     def start(self, time = TIME_NOW):
1041         msg = testbed_messages[START]
1042         self._client.send_msg(msg)
1043         reply = self._client.read_reply()
1044         result = reply.split("|")
1045         code = int(result[0])
1046         text = base64.b64decode(result[1])
1047         if code == ERROR:
1048             raise RuntimeError(text)
1049
1050     def stop(self, time = TIME_NOW):
1051         msg = testbed_messages[STOP]
1052         self._client.send_msg(msg)
1053         reply = self._client.read_reply()
1054         result = reply.split("|")
1055         code = int(result[0])
1056         text = base64.b64decode(result[1])
1057         if code == ERROR:
1058             raise RuntimeError(text)
1059
1060     def set(self, guid, name, value, time = TIME_NOW):
1061         msg = testbed_messages[SET]
1062         type = get_type(value)
1063         # avoid having "|" in this parameters
1064         name = base64.b64encode(name)
1065         value = base64.b64encode(str(value))
1066         msg = msg % (guid, name, value, type, time)
1067         self._client.send_msg(msg)
1068         reply = self._client.read_reply()
1069         result = reply.split("|")
1070         code = int(result[0])
1071         text = base64.b64decode(result[1])
1072         if code == ERROR:
1073             raise RuntimeError(text)
1074
1075     def get(self, guid, name, time = TIME_NOW):
1076         msg = testbed_messages[GET]
1077         # avoid having "|" in this parameters
1078         name = base64.b64encode(name)
1079         msg = msg % (guid, name, time)
1080         self._client.send_msg(msg)
1081         reply = self._client.read_reply()
1082         result = reply.split("|")
1083         code = int(result[0])
1084         text = base64.b64decode(result[1])
1085         if code == ERROR:
1086             raise RuntimeError(text)
1087         return text
1088
1089     def get_address(self, guid, index, attribute):
1090         msg = testbed_messages[GET_ADDRESS]
1091         # avoid having "|" in this parameters
1092         attribute = base64.b64encode(attribute)
1093         msg = msg % (guid, index, attribute)
1094         self._client.send_msg(msg)
1095         reply = self._client.read_reply()
1096         result = reply.split("|")
1097         code = int(result[0])
1098         text = base64.b64decode(result[1])
1099         if code == ERROR:
1100             raise RuntimeError(text)
1101         return text
1102
1103     def get_route(self, guid, index, attribute):
1104         msg = testbed_messages[GET_ROUTE]
1105         # avoid having "|" in this parameters
1106         attribute = base64.b64encode(attribute)
1107         msg = msg % (guid, index, attribute)
1108         self._client.send_msg(msg)
1109         reply = self._client.read_reply()
1110         result = reply.split("|")
1111         code = int(result[0])
1112         text = base64.b64decode(result[1])
1113         if code == ERROR:
1114             raise RuntimeError(text)
1115         return text
1116
1117     def action(self, time, guid, action):
1118         msg = testbed_messages[ACTION]
1119         msg = msg % (time, guid, action)
1120         self._client.send_msg(msg)
1121         reply = self._client.read_reply()
1122         result = reply.split("|")
1123         code = int(result[0])
1124         text = base64.b64decode(result[1])
1125         if code == ERROR:
1126             raise RuntimeError(text)
1127
1128     def status(self, guid = None):
1129         msg = testbed_messages[STATUS]
1130         msg = msg % str(guid)
1131         self._client.send_msg(msg)
1132         reply = self._client.read_reply()
1133         result = reply.split("|")
1134         code = int(result[0])
1135         text = base64.b64decode(result[1])
1136         if code == ERROR:
1137             raise RuntimeError(text)
1138         return int(text)
1139
1140     def trace(self, guid, trace_id, attribute='value'):
1141         msg = testbed_messages[TRACE]
1142         attribute = base64.b64encode(attribute)
1143         msg = msg % (guid, trace_id, attribute)
1144         self._client.send_msg(msg)
1145         reply = self._client.read_reply()
1146         result = reply.split("|")
1147         code = int(result[0])
1148         text = base64.b64decode(result[1])
1149         if code == ERROR:
1150             raise RuntimeError(text)
1151         return text
1152
1153     def get_attribute_list(self, guid):
1154         msg = testbed_messages[GET_ATTRIBUTE_LIST]
1155         msg = msg % (guid)
1156         self._client.send_msg(msg)
1157         reply = self._client.read_reply()
1158         result = reply.split("|")
1159         code = int(result[0])
1160         text = base64.b64decode(result[1])
1161         if code == ERROR:
1162             raise RuntimeError(text)
1163         attr_list = cPickle.loads(text)
1164         return attr_list
1165
1166     def shutdown(self):
1167         msg = testbed_messages[SHUTDOWN]
1168         self._client.send_msg(msg)
1169         reply = self._client.read_reply()
1170         result = reply.split("|")
1171         code = int(result[0])
1172         text = base64.b64decode(result[1])
1173         if code == ERROR:
1174             raise RuntimeError(text)
1175         self._client.send_stop()
1176         self._client.read_reply() # wait for it
1177
1178 class ExperimentControllerProxy(object):
1179     def __init__(self, root_dir, log_level, experiment_xml = None, 
1180             launch = True, host = None, port = None, user = None, 
1181             agent = None):
1182         if launch:
1183             # launch server
1184             if experiment_xml == None:
1185                 raise RuntimeError("To launch a ExperimentControllerServer a \
1186                         xml description of the experiment is required")
1187             # ssh
1188             if host != None:
1189                 xml = experiment_xml
1190                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1191                         s = ExperimentControllerServer(%r, %r, %r);\
1192                         s.run()" % (root_dir, log_level, xml)
1193                 proc = server.popen_ssh_subprocess(python_code, host = host,
1194                     port = port, user = user, agent = agent)
1195                 if proc.poll():
1196                     err = proc.stderr.read()
1197                     raise RuntimeError("Server could not be executed: %s" % \
1198                             err)
1199             else:
1200                 # launch daemon
1201                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1202                 s.run()
1203
1204         # connect client to server
1205         self._client = server.Client(root_dir, host = host, port = port, 
1206                 user = user, agent = agent)
1207
1208     @property
1209     def experiment_xml(self):
1210         msg = controller_messages[XML]
1211         self._client.send_msg(msg)
1212         reply = self._client.read_reply()
1213         result = reply.split("|")
1214         code = int(result[0])
1215         text = base64.b64decode(result[1])
1216         if code == ERROR:
1217             raise RuntimeError(text)
1218         return text
1219
1220     def set_access_configuration(self, testbed_guid, access_config):
1221         mode = access_config.get_attribute_value("mode")
1222         communication = access_config.get_attribute_value("communication")
1223         host = access_config.get_attribute_value("host")
1224         user = access_config.get_attribute_value("user")
1225         port = access_config.get_attribute_value("port")
1226         root_dir = access_config.get_attribute_value("rootDirectory")
1227         use_agent = access_config.get_attribute_value("useAgent")
1228         log_level = access_config.get_attribute_value("logLevel")
1229         msg = controller_messages[ACCESS]
1230         msg = msg % (testbed_guid, mode, communication, host, user, port, 
1231                 root_dir, use_agent, log_level)
1232         self._client.send_msg(msg)
1233         reply = self._client.read_reply()
1234         result = reply.split("|")
1235         code = int(result[0])
1236         text =  base64.b64decode(result[1])
1237         if code == ERROR:
1238             raise RuntimeError(text)
1239
1240     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1241         msg = controller_messages[TRACE]
1242         attribute = base64.b64encode(attribute)
1243         msg = msg % (testbed_guid, guid, trace_id, attribute)
1244         self._client.send_msg(msg)
1245         reply = self._client.read_reply()
1246         result = reply.split("|")
1247         code = int(result[0])
1248         text =  base64.b64decode(result[1])
1249         if code == OK:
1250             return text
1251         raise RuntimeError(text)
1252
1253     def start(self):
1254         msg = controller_messages[START]
1255         self._client.send_msg(msg)
1256         reply = self._client.read_reply()
1257         result = reply.split("|")
1258         code = int(result[0])
1259         text =  base64.b64decode(result[1])
1260         if code == ERROR:
1261             raise RuntimeError(text)
1262
1263     def stop(self):
1264         msg = controller_messages[STOP]
1265         self._client.send_msg(msg)
1266         reply = self._client.read_reply()
1267         result = reply.split("|")
1268         code = int(result[0])
1269         text =  base64.b64decode(result[1])
1270         if code == ERROR:
1271             raise RuntimeError(text)
1272
1273     def recover(self):
1274         msg = controller_messages[RECOVER]
1275         self._client.send_msg(msg)
1276         reply = self._client.read_reply()
1277         result = reply.split("|")
1278         code = int(result[0])
1279         text =  base64.b64decode(result[1])
1280         if code == ERROR:
1281             raise RuntimeError(text)
1282
1283     def is_finished(self, guid):
1284         msg = controller_messages[FINISHED]
1285         msg = msg % guid
1286         self._client.send_msg(msg)
1287         reply = self._client.read_reply()
1288         result = reply.split("|")
1289         code = int(result[0])
1290         text = base64.b64decode(result[1])
1291         if code == ERROR:
1292             raise RuntimeError(text)
1293         return text == "True"
1294
1295     def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
1296         msg = testbed_messages[EXPERIMENT_SET]
1297         type = get_type(value)
1298         # avoid having "|" in this parameters
1299         name = base64.b64encode(name)
1300         value = base64.b64encode(str(value))
1301         msg = msg % (testbed_guid, guid, name, value, type, time)
1302         self._client.send_msg(msg)
1303         reply = self._client.read_reply()
1304         result = reply.split("|")
1305         code = int(result[0])
1306         text = base64.b64decode(result[1])
1307         if code == ERROR:
1308             raise RuntimeError(text)
1309
1310     def get(self, testbed_guid, guid, name, time = TIME_NOW):
1311         msg = testbed_messages[EXPERIMENT_GET]
1312         # avoid having "|" in this parameters
1313         name = base64.b64encode(name)
1314         msg = msg % (testbed_guid, guid, name, time)
1315         self._client.send_msg(msg)
1316         reply = self._client.read_reply()
1317         result = reply.split("|")
1318         code = int(result[0])
1319         text = base64.b64decode(result[1])
1320         if code == ERROR:
1321             raise RuntimeError(text)
1322         return text
1323
1324     def shutdown(self):
1325         msg = controller_messages[SHUTDOWN]
1326         self._client.send_msg(msg)
1327         reply = self._client.read_reply()
1328         result = reply.split("|")
1329         code = int(result[0])
1330         text =  base64.b64decode(result[1])
1331         if code == ERROR:
1332             raise RuntimeError(text)
1333         self._client.send_stop()
1334         self._client.read_reply() # wait for it
1335