e8ca4652b014bdebe08302e6ec4cff6d29765960
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import sys
9
10 # PROTOCOL REPLIES
11 OK = 0
12 ERROR = 1
13
14 # PROTOCOL INSTRUCTION MESSAGES
15 XML = 2 
16 ACCESS  = 3
17 TRACE   = 4
18 FINISHED    = 5
19 START   = 6
20 STOP    = 7
21 SHUTDOWN    = 8
22 CONFIGURE   = 9
23 CREATE      = 10
24 CREATE_SET  = 11
25 FACTORY_SET = 12
26 CONNECT     = 13
27 CROSS_CONNECT   = 14
28 ADD_TRACE   = 15
29 ADD_ADDRESS = 16
30 ADD_ROUTE   = 17
31 DO_SETUP    = 18
32 DO_CREATE   = 19
33 DO_CONNECT  = 20
34 DO_CONFIGURE    = 21
35 DO_CROSS_CONNECT    = 22
36 GET = 23
37 SET = 24
38 ACTION  = 25
39 STATUS  = 26
40 GUIDS  = 27
41
42 # PARAMETER TYPE
43 STRING  =  100
44 INTEGER = 101
45 BOOL    = 102
46 FLOAT   = 103
47
48 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
49 controller_messages = dict({
50     XML:    "%d" % XML,
51     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"),
52     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s"),
53     FINISHED:   "%d|%s" % (FINISHED, "%d"),
54     START:  "%d" % START,
55     STOP:   "%d" % STOP,
56     SHUTDOWN:   "%d" % SHUTDOWN,
57     })
58
59 # TESTBED INSTANCE PROTOCOL MESSAGES
60 testbed_messages = dict({
61     TRACE:  "%d|%s" % (TRACE, "%d|%s"),
62     START:  "%d|%s" % (START, "%s"),
63     STOP:    "%d|%s" % (STOP, "%s"),
64     SHUTDOWN:   "%d" % SHUTDOWN,
65     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
66     CREATE: "%d|%s" % (CREATE, "%d|%s"),
67     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
68     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
69     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
70     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
71     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
72     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
73     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
74     DO_SETUP:   "%d" % DO_SETUP,
75     DO_CREATE:  "%d" % DO_CREATE,
76     DO_CONNECT: "%d" % DO_CONNECT,
77     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
78     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
79     GET:    "%d|%s" % (GET, "%s|%d|%s"),
80     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
81     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
82     STATUS: "%d|%s" % (STATUS, "%d"),
83     GUIDS:  "%d" % GUIDS,
84     })
85
86 def get_type(value):
87     if isinstance(value, bool):
88         return BOOL
89     elif isinstance(value, int):
90         return INTEGER
91     elif isinstance(value, float):
92         return FLOAT
93     else:
94         return STRING
95
96 def set_type(type, value):
97     if type == INTEGER:
98         value = int(value)
99     elif type == FLOAT:
100         value = float(value)
101     elif type == BOOL:
102         value = bool(value)
103     else:
104         value = str(value)
105     return value
106
107 class AccessConfiguration(AttributesMap):
108     MODE_SINGLE_PROCESS = "SINGLE"
109     MODE_DAEMON = "DAEMON"
110     ACCESS_SSH = "SSH"
111     ACCESS_LOCAL = "LOCAL"
112
113     def __init__(self):
114         super(AccessConfiguration, self).__init__()
115         self.add_attribute(name = "mode",
116                 help = "Instance execution mode",
117                 type = Attribute.ENUM,
118                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
119                 allowed = [AccessConfiguration.MODE_DAEMON,
120                     AccessConfiguration.MODE_SINGLE_PROCESS],
121                 validation_function = validation.is_enum)
122         self.add_attribute(name = "communication",
123                 help = "Instance communication mode",
124                 type = Attribute.ENUM,
125                 value = AccessConfiguration.ACCESS_LOCAL,
126                 allowed = [AccessConfiguration.ACCESS_LOCAL,
127                     AccessConfiguration.ACCESS_SSH],
128                 validation_function = validation.is_enum)
129         self.add_attribute(name = "host",
130                 help = "Host where the testbed will be executed",
131                 type = Attribute.STRING,
132                 value = "localhost",
133                 validation_function = validation.is_string)
134         self.add_attribute(name = "user",
135                 help = "User on the Host to execute the testbed",
136                 type = Attribute.STRING,
137                 validation_function = validation.is_string)
138         self.add_attribute(name = "port",
139                 help = "Port on the Host",
140                 type = Attribute.INTEGER,
141                 value = 22,
142                 validation_function = validation.is_integer)
143         self.add_attribute(name = "rootDirectory",
144                 help = "Root directory for storing process files",
145                 type = Attribute.STRING,
146                 value = ".",
147                 validation_function = validation.is_string) # TODO: validation.is_path
148         self.add_attribute(name = "useAgent",
149                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
150                 type = Attribute.BOOL,
151                 value = False,
152                 validation_function = validation.is_bool)
153
154 def create_controller(xml, access_config = None):
155     mode = None if not access_config else access_config.get_attribute_value("mode")
156     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
157         from nepi.core.execute import ExperimentController
158         return ExperimentController(xml)
159     elif mode ==AccessConfiguration.MODE_DAEMON:
160         root_dir = access_config.get_attribute_value("rootDirectory")
161         return ExperimentControllerProxy(xml, root_dir)
162     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
163
164 def create_testbed_instance(testbed_id, testbed_version, access_config):
165     mode = None if not access_config else access_config.get_attribute_value("mode")
166     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
167         return  _build_testbed_testbed(testbed_id, testbed_version)
168     elif mode == AccessConfiguration.MODE_DAEMON:
169                 root_dir = access_config.get_attribute_value("rootDirectory")
170                 return TestbedIntanceProxy(testbed_id, testbed_version, root_dir)
171     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
172
173 def _build_testbed_testbed(testbed_id, testbed_version):
174     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
175     if not mod_name in sys.modules:
176         __import__(mod_name)
177     module = sys.modules[mod_name]
178     return module.TestbedInstance(testbed_version)
179
180 class TestbedInstanceServer(server.Server):
181     def __init__(self, testbed_id, testbed_version, root_dir):
182         super(TestbedInstanceServer, self).__init__(root_dir)
183         self._testbed_id = testbed_id
184         self._testbed_version = testbed_version
185         self._testbed = None
186
187     def post_daemonize(self):
188         self._testbed = _build_testbed_testbed(self._testbed_id, 
189                 self._testbed_version)
190
191     def reply_action(self, msg):
192         params = msg.split("|")
193         instruction = int(params[0])
194         try:
195             if instruction == TRACE:
196                 return self.trace(params)
197             elif instruction == START:
198                 return self.start(params)
199             elif instruction == STOP:
200                 return self.stop(params)
201             elif instruction == SHUTDOWN:
202                 return self.shutdown(params)
203             elif instruction == CONFIGURE:
204                 return self.configure(params)
205             elif instruction == CREATE:
206                 return self.create(params)
207             elif instruction == CREATE_SET:
208                 return self.create_set(params)
209             elif instruction == FACTORY_SET:
210                 return self.factory_set(params)
211             elif instruction == CONNECT:
212                 return self.connect(params)
213             elif instruction == CROSS_CONNECT:
214                 return self.cross_connect(params)
215             elif instruction == ADD_TRACE:
216                 return self.add_trace(params)
217             elif instruction == ADD_ADDRESS:
218                 return self.add_address(params)
219             elif instruction == ADD_ROUTE:
220                 return self.add_route(params)
221             elif instruction == DO_SETUP:
222                 return self.do_setup(params)
223             elif instruction == DO_CREATE:
224                 return self.do_create(params)
225             elif instruction == DO_CONNECT:
226                 return self.do_connect(params)
227             elif instruction == DO_CONFIGURE:
228                 return self.do_configure(params)
229             elif instruction == DO_CROSS_CONNECT:
230                 return self.do_cross_connect(params)
231             elif instruction == GET:
232                 return self.get(params)
233             elif instruction == SET:
234                 return self.set(params)
235             elif instruction == ACTION:
236                 return self.action(params)
237             elif instruction == STATUS:
238                 return self.status(params)
239             elif instruction == GUIDS:
240                 return self.guids(params)
241             else:
242                 error = "Invalid instruction %s" % instruction
243                 self.log_error(error)
244                 result = base64.b64encode(error)
245                 return "%d|%s" % (ERROR, result)
246         except:
247             error = self.log_error()
248             result = base64.b64encode(error)
249             return "%d|%s" % (ERROR, result)
250
251     def guids(self, params):
252         guids = self._testbed.guids
253         guids = ",".join(map(str, guids))
254         result = base64.b64encode(guids)
255         return "%d|%s" % (OK, result)
256
257     def create(self, params):
258         guid = int(params[1])
259         factory_id = params[2]
260         self._testbed.create(guid, factory_id)
261         return "%d|%s" % (OK, "")
262
263     def trace(self, params):
264         guid = int(params[1])
265         trace_id = params[2]
266         trace = self._testbed.trace(guid, trace_id)
267         result = base64.b64encode(trace)
268         return "%d|%s" % (OK, result)
269
270     def start(self, params):
271         time = params[1]
272         self._testbed.start(time)
273         return "%d|%s" % (OK, "")
274
275     def stop(self, params):
276         time = params[1]
277         self._testbed.stop(time)
278         return "%d|%s" % (OK, "")
279
280     def shutdown(self, params):
281         self._testbed.shutdown()
282         return "%d|%s" % (OK, "")
283
284     def configure(self, params):
285         name = base64.b64decode(params[1])
286         value = base64.b64decode(params[2])
287         type = int(params[3])
288         value = set_type(type, value)
289         self._testbed.configure(name, value)
290         return "%d|%s" % (OK, "")
291
292     def create_set(self, params):
293         guid = int(params[1])
294         name = base64.b64decode(params[2])
295         value = base64.b64decode(params[3])
296         type = int(params[4])
297         value = set_type(type, value)
298         self._testbed.create_set(guid, name, value)
299         return "%d|%s" % (OK, "")
300
301     def factory_set(self, params):
302         name = base64.b64decode(params[1])
303         value = base64.b64decode(params[2])
304         type = int(params[3])
305         value = set_type(type, value)
306         self._testbed.factory_set(name, value)
307         return "%d|%s" % (OK, "")
308
309     def connect(self, params):
310         guid1 = int(params[1])
311         connector_type_name1 = params[2]
312         guid2 = int(params[3])
313         connector_type_name2 = params[4]
314         self._testbed.connect(guid1, connector_type_name1, guid2, 
315             connector_type_name2)
316         return "%d|%s" % (OK, "")
317
318     def cross_connect(self, params):
319         guid = int(params[1])
320         connector_type_name = params[2]
321         cross_guid = int(params[3])
322         connector_type_name = params[4]
323         cross_guid = int(params[5])
324         cross_testbed_id = params[6]
325         cross_factory_id = params[7]
326         cross_connector_type_name = params[8]
327         self._testbed.cross_connect(guid, connector_type_name, cross_guid, 
328             cross_testbed_id, cross_factory_id, cross_connector_type_name)
329         return "%d|%s" % (OK, "")
330
331     def add_trace(self, params):
332         guid = int(params[1])
333         trace_id = params[2]
334         self._testbed.add_trace(guid, trace_id)
335         return "%d|%s" % (OK, "")
336
337     def add_address(self, params):
338         guid = int(params[1])
339         family = int(params[2])
340         address = params[3]
341         netprefix = int(params[4])
342         broadcast = params[5]
343         self._testbed.add_address(guid, family, address, netprefix,
344                 broadcast)
345         return "%d|%s" % (OK, "")
346
347     def add_route(self, params):
348         guid = int(params[1])
349         destination = params[2]
350         netprefix = int(params[3])
351         nexthop = params[4]
352         self._testbed.add_route(guid, destination, netprefix, nexthop)
353         return "%d|%s" % (OK, "")
354
355     def do_setup(self, params):
356         self._testbed.do_setup()
357         return "%d|%s" % (OK, "")
358
359     def do_create(self, params):
360         self._testbed.do_create()
361         return "%d|%s" % (OK, "")
362
363     def do_connect(self, params):
364         self._testbed.do_connect()
365         return "%d|%s" % (OK, "")
366
367     def do_configure(self, params):
368         self._testbed.do_configure()
369         return "%d|%s" % (OK, "")
370
371     def do_cross_connect(self, params):
372         self._testbed.do_cross_connect()
373         return "%d|%s" % (OK, "")
374
375     def get(self, params):
376         time = params[1]
377         guid = int(param[2] )
378         name = base64.b64decode(params[3])
379         value = self._testbed.get(time, guid, name)
380         result = base64.b64encode(str(value))
381         return "%d|%s" % (OK, result)
382
383     def set(self, params):
384         time = params[1]
385         guid = int(params[2])
386         name = base64.b64decode(params[3])
387         value = base64.b64decode(params[4])
388         type = int(params[3])
389         value = set_type(type, value)
390         self._testbed.set(time, guid, name, value)
391         return "%d|%s" % (OK, "")
392
393     def action(self, params):
394         time = params[1]
395         guid = int(params[2])
396         command = base64.b64decode(params[3])
397         self._testbed.action(time, guid, command)
398         return "%d|%s" % (OK, "")
399
400     def status(self, params):
401         guid = int(params[1])
402         status = self._testbed.status(guid)
403         result = base64.b64encode(str(status))
404         return "%d|%s" % (OK, result)
405  
406 class ExperimentControllerServer(server.Server):
407     def __init__(self, experiment_xml, root_dir):
408         super(ExperimentControllerServer, self).__init__(root_dir)
409         self._experiment_xml = experiment_xml
410         self._controller = None
411
412     def post_daemonize(self):
413         from nepi.core.execute import ExperimentController
414         self._controller = ExperimentController(self._experiment_xml)
415
416     def reply_action(self, msg):
417         params = msg.split("|")
418         instruction = int(params[0])
419         try:
420             if instruction == XML:
421                 return self.experiment_xml(params)
422             elif instruction == ACCESS:
423                 return self.set_access_configuration(params)
424             elif instruction == TRACE:
425                 return self.trace(params)
426             elif instruction == FINISHED:
427                 return self.is_finished(params)
428             elif instruction == START:
429                 return self.start(params)
430             elif instruction == STOP:
431                 return self.stop(params)
432             elif instruction == SHUTDOWN:
433                 return self.shutdown(params)
434             else:
435                 error = "Invalid instruction %s" % instruction
436                 self.log_error(error)
437                 result = base64.b64encode(error)
438                 return "%d|%s" % (ERROR, result)
439         except:
440             error = self.log_error()
441             result = base64.b64encode(error)
442             return "%d|%s" % (ERROR, result)
443
444     def experiment_xml(self, params):
445         xml = self._controller.experiment_xml
446         result = base64.b64encode(xml)
447         return "%d|%s" % (OK, result)
448
449     def set_access_configuration(self, params):
450         testbed_guid = int(params[1])
451         mode = params[2]
452         communication = params[3]
453         host = params[4]
454         user = params[5]
455         port = int(params[6])
456         root_dir = params[7]
457         use_agent = bool(params[8])
458         access_config = AccessConfiguration()
459         access_config.set_attribute_value("mode", mode)
460         access_config.set_attribute_value("communication", communication)
461         access_config.set_attribute_value("host", host)
462         access_config.set_attribute_value("user", user)
463         access_config.set_attribute_value("port", port)
464         access_config.set_attribute_value("rootDirectory", root_dir)
465         access_config.set_attribute_value("useAgent", use_agent)
466         self._controller.set_access_configuration(testbed_guid, 
467                 access_config)
468         return "%d|%s" % (OK, "")
469
470     def trace(self, params):
471         testbed_guid = int(params[1])
472         guid = int(params[2])
473         trace_id = params[3]
474         trace = self._controller.trace(testbed_guid, guid, trace_id)
475         result = base64.b64encode(trace)
476         return "%d|%s" % (OK, "%s" % result)
477
478     def is_finished(self, params):
479         guid = int(params[1])
480         result = self._controller.is_finished(guid)
481         return "%d|%s" % (OK, "%r" % result)
482
483     def start(self, params):
484         self._controller.start()
485         return "%d|%s" % (OK, "")
486
487     def stop(self, params):
488         self._controller.stop()
489         return "%d|%s" % (OK, "")
490
491     def shutdown(self, params):
492         self._controller.shutdown()
493         return "%d|%s" % (OK, "")
494
495 class TestbedIntanceProxy(object):
496     def __init__(self, testbed_id, testbed_version, root_dir):
497         # launch daemon
498         s = TestbedInstanceServer(testbed_id, testbed_version, 
499                 root_dir)
500         s.run()
501         # create_client
502         self._client = server.Client(root_dir)
503
504     @property
505     def guids(self):
506         msg = testbed_messages[GUIDS]
507         self._client.send_msg(msg)
508         reply = self._client.read_reply()
509         result = reply.split("|")
510         code = int(result[0])
511         text = base64.b64decode(result[1])
512         if code == ERROR:
513             raise RuntimeError(text)
514         return map(int, text.split(","))
515
516     def configure(self, name, value):
517         msg = testbed_messages[CONFIGURE]
518         type = get_type(value)
519         # avoid having "|" in this parameters
520         name = base64.b64encode(name)
521         value = base64.b64encode(str(value))
522         msg = msg % (name, value, type)
523         self._client.send_msg(msg)
524         reply = self._client.read_reply()
525         result = reply.split("|")
526         code = int(result[0])
527         text = base64.b64decode(result[1])
528         if code == ERROR:
529             raise RuntimeError(text)
530
531     def create(self, guid, factory_id):
532         msg = testbed_messages[CREATE]
533         msg = msg % (guid, factory_id)
534         self._client.send_msg(msg)
535         reply = self._client.read_reply()
536         result = reply.split("|")
537         code = int(result[0])
538         text = base64.b64decode(result[1])
539         if code == ERROR:
540             raise RuntimeError(text)
541
542     def create_set(self, guid, name, value):
543         msg = testbed_messages[CREATE_SET]
544         type = get_type(value)
545         # avoid having "|" in this parameters
546         name = base64.b64encode(name)
547         value = base64.b64encode(str(value))
548         msg = msg % (guid, name, value, type)
549         self._client.send_msg(msg)
550         reply = self._client.read_reply()
551         result = reply.split("|")
552         code = int(result[0])
553         text = base64.b64decode(result[1])
554         if code == ERROR:
555             raise RuntimeError(text)
556
557     def factory_set(self, guid, name, value):
558         msg = testbed_messages[FACTORY_SET]
559         type = get_type(value)
560         # avoid having "|" in this parameters
561         name = base64.b64encode(name)
562         value = base64.b64encode(str(value))
563         msg = msg % (guid, name, value, type)
564         self._client.send_msg(msg)
565         reply = self._client.read_reply()
566         result = reply.split("|")
567         code = int(result[0])
568         text = base64.b64decode(result[1])
569         if code == ERROR:
570             raise RuntimeError(text)
571
572     def connect(self, guid1, connector_type_name1, guid2, 
573             connector_type_name2): 
574         msg = testbed_messages[CONNECT]
575         msg = msg % (guid1, connector_type_name1, guid2, 
576             connector_type_name2)
577         self._client.send_msg(msg)
578         reply = self._client.read_reply()
579         result = reply.split("|")
580         code = int(result[0])
581         text = base64.b64decode(result[1])
582         if code == ERROR:
583             raise RuntimeError(text)
584
585     def cross_connect(self, guid, connector_type_name, cross_guid, 
586             cross_testbed_id, cross_factory_id, cross_connector_type_name):
587         msg = testbed_messages[CROSS_CONNECT]
588         msg = msg % (guid, connector_type_name, cross_guid, 
589             cross_testbed_id, cross_factory_id, cross_connector_type_name)
590         self._client.send_msg(msg)
591         reply = self._client.read_reply()
592         result = reply.split("|")
593         code = int(result[0])
594         text = base64.b64decode(result[1])
595         if code == ERROR:
596             raise RuntimeError(text)
597
598     def add_trace(self, guid, trace_id):
599         msg = testbed_messages[ADD_TRACE]
600         msg = msg % (guid, trace_id)
601         self._client.send_msg(msg)
602         reply = self._client.read_reply()
603         result = reply.split("|")
604         code = int(result[0])
605         text = base64.b64decode(result[1])
606         if code == ERROR:
607             raise RuntimeError(text)
608
609     def add_address(self, guid, family, address, netprefix, broadcast): 
610         msg = testbed_messages[ADD_ADDRESS]
611         msg = msg % (guid, family, address, netprefix, broadcast)
612         self._client.send_msg(msg)
613         reply = self._client.read_reply()
614         result = reply.split("|")
615         code = int(result[0])
616         text = base64.b64decode(result[1])
617         if code == ERROR:
618             raise RuntimeError(text)
619
620     def add_route(self, guid, destination, netprefix, nexthop):
621         msg = testbed_messages[ADD_ROUTE]
622         msg = msg % (guid, destination, netprefix, nexthop)
623         self._client.send_msg(msg)
624         reply = self._client.read_reply()
625         result = reply.split("|")
626         code = int(result[0])
627         text = base64.b64decode(result[1])
628         if code == ERROR:
629             raise RuntimeError(text)
630
631     def do_setup(self):
632         msg = testbed_messages[DO_SETUP]
633         self._client.send_msg(msg)
634         reply = self._client.read_reply()
635         result = reply.split("|")
636         code = int(result[0])
637         text = base64.b64decode(result[1])
638         if code == ERROR:
639             raise RuntimeError(text)
640
641     def do_create(self):
642         msg = testbed_messages[DO_CREATE]
643         self._client.send_msg(msg)
644         reply = self._client.read_reply()
645         result = reply.split("|")
646         code = int(result[0])
647         text = base64.b64decode(result[1])
648         if code == ERROR:
649             raise RuntimeError(text)
650
651     def do_connect(self):
652         msg = testbed_messages[DO_CONNECT]
653         self._client.send_msg(msg)
654         reply = self._client.read_reply()
655         result = reply.split("|")
656         code = int(result[0])
657         text = base64.b64decode(result[1])
658         if code == ERROR:
659             raise RuntimeError(text)
660
661     def do_configure(self):
662         msg = testbed_messages[DO_CONFIGURE]
663         self._client.send_msg(msg)
664         reply = self._client.read_reply()
665         result = reply.split("|")
666         code = int(result[0])
667         text = base64.b64decode(result[1])
668         if code == ERROR:
669             raise RuntimeError(text)
670
671     def do_cross_connect(self):
672         msg = testbed_messages[DO_CROSS_CONNECT]
673         self._client.send_msg(msg)
674         reply = self._client.read_reply()
675         result = reply.split("|")
676         code = int(result[0])
677         text = base64.b64decode(result[1])
678         if code == ERROR:
679             raise RuntimeError(text)
680
681     def start(self, time = TIME_NOW):
682         msg = testbed_messages[START]
683         msg = msg % (time)
684         self._client.send_msg(msg)
685         reply = self._client.read_reply()
686         result = reply.split("|")
687         code = int(result[0])
688         text = base64.b64decode(result[1])
689         if code == ERROR:
690             raise RuntimeError(text)
691
692     def stop(self, time = TIME_NOW):
693         msg = testbed_messages[STOP]
694         msg = msg % (time)
695         self._client.send_msg(msg)
696         reply = self._client.read_reply()
697         result = reply.split("|")
698         code = int(result[0])
699         text = base64.b64decode(result[1])
700         if code == ERROR:
701             raise RuntimeError(text)
702
703     def set(self, time, guid, name, value):
704         msg = testbed_messages[SET]
705         type = get_type(value)
706         # avoid having "|" in this parameters
707         name = base64.b64encode(name)
708         value = base64.b64encode(str(value))
709         msg = msg % (time, guid, name, value, type)
710         self._client.send_msg(msg)
711         reply = self._client.read_reply()
712         result = reply.split("|")
713         code = int(result[0])
714         text = base64.b64decode(result[1])
715         if code == ERROR:
716             raise RuntimeError(text)
717
718     def get(self, time, guid, name):
719         msg = testbed_messages[GET]
720         # avoid having "|" in this parameters
721         name = base64.b64encode(name)
722         msg = msg % (time, guid, name)
723         self._client.send_msg(msg)
724         reply = self._client.read_reply()
725         result = reply.split("|")
726         code = int(result[0])
727         text = base64.b64decode(result[1])
728         if code == ERROR:
729             raise RuntimeError(text)
730         return text
731
732     def action(self, time, guid, action):
733         msg = testbed_messages[ACTION]
734         msg = msg % (time, guid, action)
735         self._client.send_msg(msg)
736         reply = self._client.read_reply()
737         result = reply.split("|")
738         code = int(result[0])
739         text = base64.b64decode(result[1])
740         if code == ERROR:
741             raise RuntimeError(text)
742
743     def status(self, guid):
744         msg = testbed_messages[STATUS]
745         msg = msg % (guid)
746         self._client.send_msg(msg)
747         reply = self._client.read_reply()
748         result = reply.split("|")
749         code = int(result[0])
750         text = base64.b64decode(result[1])
751         if code == ERROR:
752             raise RuntimeError(text)
753         return int(text)
754
755     def trace(self, guid, trace_id):
756         msg = testbed_messages[TRACE]
757         msg = msg % (guid, trace_id)
758         self._client.send_msg(msg)
759         reply = self._client.read_reply()
760         result = reply.split("|")
761         code = int(result[0])
762         text = base64.b64decode(result[1])
763         if code == ERROR:
764             raise RuntimeError(text)
765         return text
766
767     def shutdown(self):
768         msg = testbed_messages[SHUTDOWN]
769         self._client.send_msg(msg)
770         reply = self._client.read_reply()
771         result = reply.split("|")
772         code = int(result[0])
773         text = base64.b64decode(result[1])
774         if code == ERROR:
775             raise RuntimeError(text)
776         self._client.send_stop()
777
778 class ExperimentControllerProxy(object):
779     def __init__(self, experiment_xml, root_dir):
780         # launch daemon
781         s = ExperimentControllerServer(experiment_xml, root_dir)
782         s.run()
783         # create_client
784         self._client = server.Client(root_dir)
785
786     @property
787     def experiment_xml(self):
788         msg = controller_messages[XML]
789         self._client.send_msg(msg)
790         reply = self._client.read_reply()
791         result = reply.split("|")
792         code = int(result[0])
793         text =  base64.b64decode(result[1])
794         if code == OK:
795             return text
796         raise RuntimeError(text)
797
798     def set_access_configuration(self, testbed_guid, access_config):
799         mode = access_config.get_attribute_value("mode")
800         communication = access_config.get_attribute_value("communication")
801         host = access_config.get_attribute_value("host")
802         user = access_config.get_attribute_value("user")
803         port = access_config.get_attribute_value("port")
804         root_dir = access_config.get_attribute_value("rootDirectory")
805         use_agent = access_config.get_attribute_value("useAgent")
806         msg = controller_messages[ACCESS]
807         msg = msg % (testbed_guid, mode, communication, host, user, port, 
808                 root_dir, use_agent)
809         self._client.send_msg(msg)
810         reply = self._client.read_reply()
811         result = reply.split("|")
812         code = int(result[0])
813         text =  base64.b64decode(result[1])
814         if code == ERROR:
815             raise RuntimeError(text)
816
817     def trace(self, testbed_guid, guid, trace_id):
818         msg = controller_messages[TRACE]
819         msg = msg % (testbed_guid, guid, trace_id)
820         self._client.send_msg(msg)
821         reply = self._client.read_reply()
822         result = reply.split("|")
823         code = int(result[0])
824         text =  base64.b64decode(result[1])
825         if code == OK:
826             return text
827         raise RuntimeError(text)
828
829     def start(self):
830         msg = controller_messages[START]
831         self._client.send_msg(msg)
832         reply = self._client.read_reply()
833         result = reply.split("|")
834         code = int(result[0])
835         text =  base64.b64decode(result[1])
836         if code == ERROR:
837             raise RuntimeError(text)
838
839     def stop(self):
840         msg = controller_messages[STOP]
841         self._client.send_msg(msg)
842         reply = self._client.read_reply()
843         result = reply.split("|")
844         code = int(result[0])
845         text =  base64.b64decode(result[1])
846         if code == ERROR:
847             raise RuntimeError(text)
848
849     def is_finished(self, guid):
850         msg = controller_messages[FINISHED]
851         msg = msg % guid
852         self._client.send_msg(msg)
853         reply = self._client.read_reply()
854         result = reply.split("|")
855         code = int(result[0])
856         if code == OK:
857             return bool(result[1])
858         error =  base64.b64decode(result[1])
859         raise RuntimeError(error)
860
861     def shutdown(self):
862         msg = controller_messages[SHUTDOWN]
863         self._client.send_msg(msg)
864         reply = self._client.read_reply()
865         result = reply.split("|")
866         code = int(result[0])
867         text =  base64.b64decode(result[1])
868         if code == ERROR:
869             raise RuntimeError(text)
870         self._client.send_stop()
871