e3d6edbc0d1120e400bb7d0ed4525fc43e55854d
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import sys
9
10 # PROTOCOL REPLIES
11 OK = 0
12 ERROR = 1
13
14 # PROTOCOL INSTRUCTION MESSAGES
15 XML = 2 
16 ACCESS  = 3
17 TRACE   = 4
18 FINISHED    = 5
19 START   = 6
20 STOP    = 7
21 SHUTDOWN    = 8
22 CONFIGURE   = 9
23 CREATE      = 10
24 CREATE_SET  = 11
25 FACTORY_SET = 12
26 CONNECT     = 13
27 CROSS_CONNECT   = 14
28 ADD_TRACE   = 15
29 ADD_ADDRESS = 16
30 ADD_ROUTE   = 17
31 DO_SETUP    = 18
32 DO_CREATE   = 19
33 DO_CONNECT  = 20
34 DO_CONFIGURE    = 21
35 DO_CROSS_CONNECT    = 22
36 GET = 23
37 SET = 24
38 ACTION  = 25
39 STATUS  = 26
40 GUIDS  = 27
41
42 # PARAMETER TYPE
43 STRING  =  100
44 INTEGER = 101
45 BOOL    = 102
46 FLOAT   = 103
47
48 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
49 controller_messages = dict({
50     XML:    "%d" % XML,
51     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"),
52     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s"),
53     FINISHED:   "%d|%s" % (FINISHED, "%d"),
54     START:  "%d" % START,
55     STOP:   "%d" % STOP,
56     SHUTDOWN:   "%d" % SHUTDOWN,
57     })
58
59 # TESTBED INSTANCE PROTOCOL MESSAGES
60 testbed_messages = dict({
61     TRACE:  "%d|%s" % (TRACE, "%d|%s"),
62     START:  "%d|%s" % (START, "%s"),
63     STOP:    "%d|%s" % (STOP, "%s"),
64     SHUTDOWN:   "%d" % SHUTDOWN,
65     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
66     CREATE: "%d|%s" % (CREATE, "%d|%s"),
67     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
68     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
69     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
70     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
71     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
72     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
73     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
74     DO_SETUP:   "%d" % DO_SETUP,
75     DO_CREATE:  "%d" % DO_CREATE,
76     DO_CONNECT: "%d" % DO_CONNECT,
77     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
78     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
79     GET:    "%d|%s" % (GET, "%s|%d|%s"),
80     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
81     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
82     STATUS: "%d|%s" % (STATUS, "%d"),
83     GUIDS:  "%d" % GUIDS,
84     })
85
86 instruction_text = dict({
87     OK:     "OK",
88     ERROR:  "ERROR",
89     XML:    "XML",
90     ACCESS: "ACCESS",
91     TRACE:  "TRACE",
92     FINISHED:   "FINISHED",
93     START:  "START",
94     STOP:   "STOP",
95     SHUTDOWN:   "SHUTDOWN",
96     CONFIGURE:  "CONFIGURE",
97     CREATE: "CREATE",
98     CREATE_SET: "CREATE_SET",
99     FACTORY_SET:    "FACTORY_SET",
100     CONNECT:    "CONNECT",
101     CROSS_CONNECT: "CROSS_CONNECT",
102     ADD_TRACE:  "ADD_TRACE",
103     ADD_ADDRESS:    "ADD_ADDRESS",
104     ADD_ROUTE:  "ADD_ROUTE",
105     DO_SETUP:   "DO_SETUP",
106     DO_CREATE:  "DO_CREATE",
107     DO_CONNECT: "DO_CONNECT",
108     DO_CONFIGURE:   "DO_CONFIGURE",
109     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
110     GET:    "GET",
111     SET:    "SET",
112     ACTION: "ACTION",
113     STATUS: "STATUS",
114     GUIDS:  "GUIDS",
115     STRING: "STRING",
116     INTEGER:    "INTEGER",
117     BOOL:   "BOOL",
118     FLOAT:  "FLOAT"
119     })
120
121 def get_type(value):
122     if isinstance(value, bool):
123         return BOOL
124     elif isinstance(value, int):
125         return INTEGER
126     elif isinstance(value, float):
127         return FLOAT
128     else:
129         return STRING
130
131 def set_type(type, value):
132     if type == INTEGER:
133         value = int(value)
134     elif type == FLOAT:
135         value = float(value)
136     elif type == BOOL:
137         value = value == "True"
138     else:
139         value = str(value)
140     return value
141
142 def log_msg(server, params):
143     instr = int(params[0])
144     instr_txt = instruction_text[instr]
145     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
146         instr_txt, ", ".join(map(str, params[1:]))))
147
148 def log_reply(server, reply):
149     res = reply.split("|")
150     code = int(res[0])
151     code_txt = instruction_text[code]
152     txt = base64.b64decode(res[1])
153     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
154             code_txt, txt))
155
156 class AccessConfiguration(AttributesMap):
157     MODE_SINGLE_PROCESS = "SINGLE"
158     MODE_DAEMON = "DAEMON"
159     ACCESS_SSH = "SSH"
160     ACCESS_LOCAL = "LOCAL"
161     ERROR_LEVEL = "Error"
162     DEBUG_LEVEL = "Debug"
163
164     def __init__(self):
165         super(AccessConfiguration, self).__init__()
166         self.add_attribute(name = "mode",
167                 help = "Instance execution mode",
168                 type = Attribute.ENUM,
169                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
170                 allowed = [AccessConfiguration.MODE_DAEMON,
171                     AccessConfiguration.MODE_SINGLE_PROCESS],
172                 validation_function = validation.is_enum)
173         self.add_attribute(name = "communication",
174                 help = "Instance communication mode",
175                 type = Attribute.ENUM,
176                 value = AccessConfiguration.ACCESS_LOCAL,
177                 allowed = [AccessConfiguration.ACCESS_LOCAL,
178                     AccessConfiguration.ACCESS_SSH],
179                 validation_function = validation.is_enum)
180         self.add_attribute(name = "host",
181                 help = "Host where the testbed will be executed",
182                 type = Attribute.STRING,
183                 value = "localhost",
184                 validation_function = validation.is_string)
185         self.add_attribute(name = "user",
186                 help = "User on the Host to execute the testbed",
187                 type = Attribute.STRING,
188                 validation_function = validation.is_string)
189         self.add_attribute(name = "port",
190                 help = "Port on the Host",
191                 type = Attribute.INTEGER,
192                 value = 22,
193                 validation_function = validation.is_integer)
194         self.add_attribute(name = "rootDirectory",
195                 help = "Root directory for storing process files",
196                 type = Attribute.STRING,
197                 value = ".",
198                 validation_function = validation.is_string) # TODO: validation.is_path
199         self.add_attribute(name = "useAgent",
200                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
201                 type = Attribute.BOOL,
202                 value = False,
203                 validation_function = validation.is_bool)
204         self.add_attribute(name = "logLevel",
205                 help = "Log level for instance",
206                 type = Attribute.ENUM,
207                 value = AccessConfiguration.ERROR_LEVEL,
208                 allowed = [AccessConfiguration.ERROR_LEVEL,
209                     AccessConfiguration.DEBUG_LEVEL],
210                 validation_function = validation.is_enum)
211
212 def create_controller(xml, access_config = None):
213     mode = None if not access_config else access_config.get_attribute_value("mode")
214     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
215         from nepi.core.execute import ExperimentController
216         return ExperimentController(xml)
217     elif mode == AccessConfiguration.MODE_DAEMON:
218         root_dir = access_config.get_attribute_value("rootDirectory")
219         log_level = access_config.get_attribute_value("logLevel")
220         return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml)
221     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
222
223 def create_testbed_instance(testbed_id, testbed_version, access_config):
224     mode = None if not access_config else access_config.get_attribute_value("mode")
225     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
226         return  _build_testbed_testbed(testbed_id, testbed_version)
227     elif mode == AccessConfiguration.MODE_DAEMON:
228         root_dir = access_config.get_attribute_value("rootDirectory")
229         log_level = access_config.get_attribute_value("logLevel")
230         return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, 
231                 testbed_version = testbed_version)
232     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
233
234 def _build_testbed_testbed(testbed_id, testbed_version):
235     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
236     if not mod_name in sys.modules:
237         __import__(mod_name)
238     module = sys.modules[mod_name]
239     return module.TestbedInstance(testbed_version)
240
241 class TestbedInstanceServer(server.Server):
242     def __init__(self, root_dir, testbed_id, testbed_version):
243         super(TestbedInstanceServer, self).__init__(root_dir)
244         self._testbed_id = testbed_id
245         self._testbed_version = testbed_version
246         self._testbed = None
247
248     def post_daemonize(self):
249         self._testbed = _build_testbed_testbed(self._testbed_id, 
250                 self._testbed_version)
251
252     def reply_action(self, msg):
253         params = msg.split("|")
254         instruction = int(params[0])
255         log_msg(self, params)
256         try:
257             if instruction == TRACE:
258                 reply = self.trace(params)
259             elif instruction == START:
260                 reply = self.start(params)
261             elif instruction == STOP:
262                 reply = self.stop(params)
263             elif instruction == SHUTDOWN:
264                 reply = self.shutdown(params)
265             elif instruction == CONFIGURE:
266                 reply = self.configure(params)
267             elif instruction == CREATE:
268                 reply = self.create(params)
269             elif instruction == CREATE_SET:
270                 reply = self.create_set(params)
271             elif instruction == FACTORY_SET:
272                 reply = self.factory_set(params)
273             elif instruction == CONNECT:
274                 reply = self.connect(params)
275             elif instruction == CROSS_CONNECT:
276                 reply = self.cross_connect(params)
277             elif instruction == ADD_TRACE:
278                 reply = self.add_trace(params)
279             elif instruction == ADD_ADDRESS:
280                 reply = self.add_address(params)
281             elif instruction == ADD_ROUTE:
282                 reply = self.add_route(params)
283             elif instruction == DO_SETUP:
284                 reply = self.do_setup(params)
285             elif instruction == DO_CREATE:
286                 reply = self.do_create(params)
287             elif instruction == DO_CONNECT:
288                 reply = self.do_connect(params)
289             elif instruction == DO_CONFIGURE:
290                 reply = self.do_configure(params)
291             elif instruction == DO_CROSS_CONNECT:
292                 reply = self.do_cross_connect(params)
293             elif instruction == GET:
294                 reply = self.get(params)
295             elif instruction == SET:
296                 reply = self.set(params)
297             elif instruction == ACTION:
298                 reply = self.action(params)
299             elif instruction == STATUS:
300                 reply = self.status(params)
301             elif instruction == GUIDS:
302                 reply = self.guids(params)
303             else:
304                 error = "Invalid instruction %s" % instruction
305                 self.log_error(error)
306                 result = base64.b64encode(error)
307                 reply = "%d|%s" % (ERROR, result)
308         except:
309             error = self.log_error()
310             result = base64.b64encode(error)
311             reply = "%d|%s" % (ERROR, result)
312         log_reply(self, reply)
313         return reply
314
315     def guids(self, params):
316         guids = self._testbed.guids
317         guids = ",".join(map(str, guids))
318         result = base64.b64encode(guids)
319         return "%d|%s" % (OK, result)
320
321     def create(self, params):
322         guid = int(params[1])
323         factory_id = params[2]
324         self._testbed.create(guid, factory_id)
325         return "%d|%s" % (OK, "")
326
327     def trace(self, params):
328         guid = int(params[1])
329         trace_id = params[2]
330         trace = self._testbed.trace(guid, trace_id)
331         result = base64.b64encode(trace)
332         return "%d|%s" % (OK, result)
333
334     def start(self, params):
335         time = params[1]
336         self._testbed.start(time)
337         return "%d|%s" % (OK, "")
338
339     def stop(self, params):
340         time = params[1]
341         self._testbed.stop(time)
342         return "%d|%s" % (OK, "")
343
344     def shutdown(self, params):
345         self._testbed.shutdown()
346         return "%d|%s" % (OK, "")
347
348     def configure(self, params):
349         name = base64.b64decode(params[1])
350         value = base64.b64decode(params[2])
351         type = int(params[3])
352         value = set_type(type, value)
353         self._testbed.configure(name, value)
354         return "%d|%s" % (OK, "")
355
356     def create_set(self, params):
357         guid = int(params[1])
358         name = base64.b64decode(params[2])
359         value = base64.b64decode(params[3])
360         type = int(params[4])
361         value = set_type(type, value)
362         self._testbed.create_set(guid, name, value)
363         return "%d|%s" % (OK, "")
364
365     def factory_set(self, params):
366         name = base64.b64decode(params[1])
367         value = base64.b64decode(params[2])
368         type = int(params[3])
369         value = set_type(type, value)
370         self._testbed.factory_set(name, value)
371         return "%d|%s" % (OK, "")
372
373     def connect(self, params):
374         guid1 = int(params[1])
375         connector_type_name1 = params[2]
376         guid2 = int(params[3])
377         connector_type_name2 = params[4]
378         self._testbed.connect(guid1, connector_type_name1, guid2, 
379             connector_type_name2)
380         return "%d|%s" % (OK, "")
381
382     def cross_connect(self, params):
383         guid = int(params[1])
384         connector_type_name = params[2]
385         cross_guid = int(params[3])
386         connector_type_name = params[4]
387         cross_guid = int(params[5])
388         cross_testbed_id = params[6]
389         cross_factory_id = params[7]
390         cross_connector_type_name = params[8]
391         self._testbed.cross_connect(guid, connector_type_name, cross_guid, 
392             cross_testbed_id, cross_factory_id, cross_connector_type_name)
393         return "%d|%s" % (OK, "")
394
395     def add_trace(self, params):
396         guid = int(params[1])
397         trace_id = params[2]
398         self._testbed.add_trace(guid, trace_id)
399         return "%d|%s" % (OK, "")
400
401     def add_address(self, params):
402         guid = int(params[1])
403         family = int(params[2])
404         address = params[3]
405         netprefix = int(params[4])
406         broadcast = params[5]
407         self._testbed.add_address(guid, family, address, netprefix,
408                 broadcast)
409         return "%d|%s" % (OK, "")
410
411     def add_route(self, params):
412         guid = int(params[1])
413         destination = params[2]
414         netprefix = int(params[3])
415         nexthop = params[4]
416         self._testbed.add_route(guid, destination, netprefix, nexthop)
417         return "%d|%s" % (OK, "")
418
419     def do_setup(self, params):
420         self._testbed.do_setup()
421         return "%d|%s" % (OK, "")
422
423     def do_create(self, params):
424         self._testbed.do_create()
425         return "%d|%s" % (OK, "")
426
427     def do_connect(self, params):
428         self._testbed.do_connect()
429         return "%d|%s" % (OK, "")
430
431     def do_configure(self, params):
432         self._testbed.do_configure()
433         return "%d|%s" % (OK, "")
434
435     def do_cross_connect(self, params):
436         self._testbed.do_cross_connect()
437         return "%d|%s" % (OK, "")
438
439     def get(self, params):
440         time = params[1]
441         guid = int(param[2] )
442         name = base64.b64decode(params[3])
443         value = self._testbed.get(time, guid, name)
444         result = base64.b64encode(str(value))
445         return "%d|%s" % (OK, result)
446
447     def set(self, params):
448         time = params[1]
449         guid = int(params[2])
450         name = base64.b64decode(params[3])
451         value = base64.b64decode(params[4])
452         type = int(params[3])
453         value = set_type(type, value)
454         self._testbed.set(time, guid, name, value)
455         return "%d|%s" % (OK, "")
456
457     def action(self, params):
458         time = params[1]
459         guid = int(params[2])
460         command = base64.b64decode(params[3])
461         self._testbed.action(time, guid, command)
462         return "%d|%s" % (OK, "")
463
464     def status(self, params):
465         guid = int(params[1])
466         status = self._testbed.status(guid)
467         result = base64.b64encode(str(status))
468         return "%d|%s" % (OK, result)
469  
470 class ExperimentControllerServer(server.Server):
471     def __init__(self, root_dir, experiment_xml):
472         super(ExperimentControllerServer, self).__init__(root_dir)
473         self._experiment_xml = experiment_xml
474         self._controller = None
475
476     def post_daemonize(self):
477         from nepi.core.execute import ExperimentController
478         self._controller = ExperimentController(self._experiment_xml)
479
480     def reply_action(self, msg):
481         params = msg.split("|")
482         instruction = int(params[0])
483         log_msg(self, params)
484         try:
485             if instruction == XML:
486                 reply = self.experiment_xml(params)
487             elif instruction == ACCESS:
488                 reply = self.set_access_configuration(params)
489             elif instruction == TRACE:
490                 reply = self.trace(params)
491             elif instruction == FINISHED:
492                 reply = self.is_finished(params)
493             elif instruction == START:
494                 reply = self.start(params)
495             elif instruction == STOP:
496                 reply = self.stop(params)
497             elif instruction == SHUTDOWN:
498                 reply = self.shutdown(params)
499             else:
500                 error = "Invalid instruction %s" % instruction
501                 self.log_error(error)
502                 result = base64.b64encode(error)
503                 reply = "%d|%s" % (ERROR, result)
504         except:
505             error = self.log_error()
506             result = base64.b64encode(error)
507             reply = "%d|%s" % (ERROR, result)
508         log_reply(self, reply)
509         return reply
510
511     def experiment_xml(self, params):
512         xml = self._controller.experiment_xml
513         result = base64.b64encode(xml)
514         return "%d|%s" % (OK, result)
515
516     def set_access_configuration(self, params):
517         testbed_guid = int(params[1])
518         mode = params[2]
519         communication = params[3]
520         host = params[4]
521         user = params[5]
522         port = int(params[6])
523         root_dir = params[7]
524         use_agent = params[8] == "True"
525         access_config = AccessConfiguration()
526         access_config.set_attribute_value("mode", mode)
527         access_config.set_attribute_value("communication", communication)
528         access_config.set_attribute_value("host", host)
529         access_config.set_attribute_value("user", user)
530         access_config.set_attribute_value("port", port)
531         access_config.set_attribute_value("rootDirectory", root_dir)
532         access_config.set_attribute_value("useAgent", use_agent)
533         self._controller.set_access_configuration(testbed_guid, 
534                 access_config)
535         return "%d|%s" % (OK, "")
536
537     def trace(self, params):
538         testbed_guid = int(params[1])
539         guid = int(params[2])
540         trace_id = params[3]
541         trace = self._controller.trace(testbed_guid, guid, trace_id)
542         result = base64.b64encode(trace)
543         return "%d|%s" % (OK, result)
544
545     def is_finished(self, params):
546         guid = int(params[1])
547         status = self._controller.is_finished(guid)
548         result = base64.b64encode(str(status))
549         return "%d|%s" % (OK, result)
550
551     def start(self, params):
552         self._controller.start()
553         return "%d|%s" % (OK, "")
554
555     def stop(self, params):
556         self._controller.stop()
557         return "%d|%s" % (OK, "")
558
559     def shutdown(self, params):
560         self._controller.shutdown()
561         return "%d|%s" % (OK, "")
562
563 class TestbedIntanceProxy(object):
564     def __init__(self, root_dir, log_level, testbed_id = None, 
565             testbed_version = None, launch = True):
566         if launch:
567             if testbed_id == None or testbed_version == None:
568                 raise RuntimeError("To launch a TesbedInstance server a \
569                         testbed_id and testbed_version are required")
570             # launch daemon
571             s = TestbedInstanceServer(root_dir, testbed_id, testbed_version)
572             if log_level == AccessConfiguration.DEBUG_LEVEL:
573                 s.set_debug_log_level()
574             s.run()
575         # create_client
576         self._client = server.Client(root_dir)
577
578     @property
579     def guids(self):
580         msg = testbed_messages[GUIDS]
581         self._client.send_msg(msg)
582         reply = self._client.read_reply()
583         result = reply.split("|")
584         code = int(result[0])
585         text = base64.b64decode(result[1])
586         if code == ERROR:
587             raise RuntimeError(text)
588         return map(int, text.split(","))
589
590     def configure(self, name, value):
591         msg = testbed_messages[CONFIGURE]
592         type = get_type(value)
593         # avoid having "|" in this parameters
594         name = base64.b64encode(name)
595         value = base64.b64encode(str(value))
596         msg = msg % (name, value, type)
597         self._client.send_msg(msg)
598         reply = self._client.read_reply()
599         result = reply.split("|")
600         code = int(result[0])
601         text = base64.b64decode(result[1])
602         if code == ERROR:
603             raise RuntimeError(text)
604
605     def create(self, guid, factory_id):
606         msg = testbed_messages[CREATE]
607         msg = msg % (guid, factory_id)
608         self._client.send_msg(msg)
609         reply = self._client.read_reply()
610         result = reply.split("|")
611         code = int(result[0])
612         text = base64.b64decode(result[1])
613         if code == ERROR:
614             raise RuntimeError(text)
615
616     def create_set(self, guid, name, value):
617         msg = testbed_messages[CREATE_SET]
618         type = get_type(value)
619         # avoid having "|" in this parameters
620         name = base64.b64encode(name)
621         value = base64.b64encode(str(value))
622         msg = msg % (guid, name, value, type)
623         self._client.send_msg(msg)
624         reply = self._client.read_reply()
625         result = reply.split("|")
626         code = int(result[0])
627         text = base64.b64decode(result[1])
628         if code == ERROR:
629             raise RuntimeError(text)
630
631     def factory_set(self, guid, name, value):
632         msg = testbed_messages[FACTORY_SET]
633         type = get_type(value)
634         # avoid having "|" in this parameters
635         name = base64.b64encode(name)
636         value = base64.b64encode(str(value))
637         msg = msg % (guid, name, value, type)
638         self._client.send_msg(msg)
639         reply = self._client.read_reply()
640         result = reply.split("|")
641         code = int(result[0])
642         text = base64.b64decode(result[1])
643         if code == ERROR:
644             raise RuntimeError(text)
645
646     def connect(self, guid1, connector_type_name1, guid2, 
647             connector_type_name2): 
648         msg = testbed_messages[CONNECT]
649         msg = msg % (guid1, connector_type_name1, guid2, 
650             connector_type_name2)
651         self._client.send_msg(msg)
652         reply = self._client.read_reply()
653         result = reply.split("|")
654         code = int(result[0])
655         text = base64.b64decode(result[1])
656         if code == ERROR:
657             raise RuntimeError(text)
658
659     def cross_connect(self, guid, connector_type_name, cross_guid, 
660             cross_testbed_id, cross_factory_id, cross_connector_type_name):
661         msg = testbed_messages[CROSS_CONNECT]
662         msg = msg % (guid, connector_type_name, cross_guid, 
663             cross_testbed_id, cross_factory_id, cross_connector_type_name)
664         self._client.send_msg(msg)
665         reply = self._client.read_reply()
666         result = reply.split("|")
667         code = int(result[0])
668         text = base64.b64decode(result[1])
669         if code == ERROR:
670             raise RuntimeError(text)
671
672     def add_trace(self, guid, trace_id):
673         msg = testbed_messages[ADD_TRACE]
674         msg = msg % (guid, trace_id)
675         self._client.send_msg(msg)
676         reply = self._client.read_reply()
677         result = reply.split("|")
678         code = int(result[0])
679         text = base64.b64decode(result[1])
680         if code == ERROR:
681             raise RuntimeError(text)
682
683     def add_address(self, guid, family, address, netprefix, broadcast): 
684         msg = testbed_messages[ADD_ADDRESS]
685         msg = msg % (guid, family, address, netprefix, broadcast)
686         self._client.send_msg(msg)
687         reply = self._client.read_reply()
688         result = reply.split("|")
689         code = int(result[0])
690         text = base64.b64decode(result[1])
691         if code == ERROR:
692             raise RuntimeError(text)
693
694     def add_route(self, guid, destination, netprefix, nexthop):
695         msg = testbed_messages[ADD_ROUTE]
696         msg = msg % (guid, destination, netprefix, nexthop)
697         self._client.send_msg(msg)
698         reply = self._client.read_reply()
699         result = reply.split("|")
700         code = int(result[0])
701         text = base64.b64decode(result[1])
702         if code == ERROR:
703             raise RuntimeError(text)
704
705     def do_setup(self):
706         msg = testbed_messages[DO_SETUP]
707         self._client.send_msg(msg)
708         reply = self._client.read_reply()
709         result = reply.split("|")
710         code = int(result[0])
711         text = base64.b64decode(result[1])
712         if code == ERROR:
713             raise RuntimeError(text)
714
715     def do_create(self):
716         msg = testbed_messages[DO_CREATE]
717         self._client.send_msg(msg)
718         reply = self._client.read_reply()
719         result = reply.split("|")
720         code = int(result[0])
721         text = base64.b64decode(result[1])
722         if code == ERROR:
723             raise RuntimeError(text)
724
725     def do_connect(self):
726         msg = testbed_messages[DO_CONNECT]
727         self._client.send_msg(msg)
728         reply = self._client.read_reply()
729         result = reply.split("|")
730         code = int(result[0])
731         text = base64.b64decode(result[1])
732         if code == ERROR:
733             raise RuntimeError(text)
734
735     def do_configure(self):
736         msg = testbed_messages[DO_CONFIGURE]
737         self._client.send_msg(msg)
738         reply = self._client.read_reply()
739         result = reply.split("|")
740         code = int(result[0])
741         text = base64.b64decode(result[1])
742         if code == ERROR:
743             raise RuntimeError(text)
744
745     def do_cross_connect(self):
746         msg = testbed_messages[DO_CROSS_CONNECT]
747         self._client.send_msg(msg)
748         reply = self._client.read_reply()
749         result = reply.split("|")
750         code = int(result[0])
751         text = base64.b64decode(result[1])
752         if code == ERROR:
753             raise RuntimeError(text)
754
755     def start(self, time = TIME_NOW):
756         msg = testbed_messages[START]
757         msg = msg % (time)
758         self._client.send_msg(msg)
759         reply = self._client.read_reply()
760         result = reply.split("|")
761         code = int(result[0])
762         text = base64.b64decode(result[1])
763         if code == ERROR:
764             raise RuntimeError(text)
765
766     def stop(self, time = TIME_NOW):
767         msg = testbed_messages[STOP]
768         msg = msg % (time)
769         self._client.send_msg(msg)
770         reply = self._client.read_reply()
771         result = reply.split("|")
772         code = int(result[0])
773         text = base64.b64decode(result[1])
774         if code == ERROR:
775             raise RuntimeError(text)
776
777     def set(self, time, guid, name, value):
778         msg = testbed_messages[SET]
779         type = get_type(value)
780         # avoid having "|" in this parameters
781         name = base64.b64encode(name)
782         value = base64.b64encode(str(value))
783         msg = msg % (time, guid, name, value, type)
784         self._client.send_msg(msg)
785         reply = self._client.read_reply()
786         result = reply.split("|")
787         code = int(result[0])
788         text = base64.b64decode(result[1])
789         if code == ERROR:
790             raise RuntimeError(text)
791
792     def get(self, time, guid, name):
793         msg = testbed_messages[GET]
794         # avoid having "|" in this parameters
795         name = base64.b64encode(name)
796         msg = msg % (time, guid, name)
797         self._client.send_msg(msg)
798         reply = self._client.read_reply()
799         result = reply.split("|")
800         code = int(result[0])
801         text = base64.b64decode(result[1])
802         if code == ERROR:
803             raise RuntimeError(text)
804         return text
805
806     def action(self, time, guid, action):
807         msg = testbed_messages[ACTION]
808         msg = msg % (time, guid, action)
809         self._client.send_msg(msg)
810         reply = self._client.read_reply()
811         result = reply.split("|")
812         code = int(result[0])
813         text = base64.b64decode(result[1])
814         if code == ERROR:
815             raise RuntimeError(text)
816
817     def status(self, guid):
818         msg = testbed_messages[STATUS]
819         msg = msg % (guid)
820         self._client.send_msg(msg)
821         reply = self._client.read_reply()
822         result = reply.split("|")
823         code = int(result[0])
824         text = base64.b64decode(result[1])
825         if code == ERROR:
826             raise RuntimeError(text)
827         return int(text)
828
829     def trace(self, guid, trace_id):
830         msg = testbed_messages[TRACE]
831         msg = msg % (guid, trace_id)
832         self._client.send_msg(msg)
833         reply = self._client.read_reply()
834         result = reply.split("|")
835         code = int(result[0])
836         text = base64.b64decode(result[1])
837         if code == ERROR:
838             raise RuntimeError(text)
839         return text
840
841     def shutdown(self):
842         msg = testbed_messages[SHUTDOWN]
843         self._client.send_msg(msg)
844         reply = self._client.read_reply()
845         result = reply.split("|")
846         code = int(result[0])
847         text = base64.b64decode(result[1])
848         if code == ERROR:
849             raise RuntimeError(text)
850         self._client.send_stop()
851
852 class ExperimentControllerProxy(object):
853     def __init__(self, root_dir, log_level, experiment_xml = None, launch = True):
854         if launch:
855             if experiment_xml == None:
856                 raise RuntimeError("To launch a ExperimentControllerServer a \
857                         xml description of the experiment is required")
858             # launch daemon
859             s = ExperimentControllerServer(root_dir, experiment_xml)
860             if log_level == AccessConfiguration.DEBUG_LEVEL:
861                 s.set_debug_log_level()
862             s.run()
863         # create_client
864         self._client = server.Client(root_dir)
865
866     @property
867     def experiment_xml(self):
868         msg = controller_messages[XML]
869         self._client.send_msg(msg)
870         reply = self._client.read_reply()
871         result = reply.split("|")
872         code = int(result[0])
873         text = base64.b64decode(result[1])
874         if code == ERROR:
875             raise RuntimeError(text)
876         return text
877
878     def set_access_configuration(self, testbed_guid, access_config):
879         mode = access_config.get_attribute_value("mode")
880         communication = access_config.get_attribute_value("communication")
881         host = access_config.get_attribute_value("host")
882         user = access_config.get_attribute_value("user")
883         port = access_config.get_attribute_value("port")
884         root_dir = access_config.get_attribute_value("rootDirectory")
885         use_agent = access_config.get_attribute_value("useAgent")
886         msg = controller_messages[ACCESS]
887         msg = msg % (testbed_guid, mode, communication, host, user, port, 
888                 root_dir, use_agent)
889         self._client.send_msg(msg)
890         reply = self._client.read_reply()
891         result = reply.split("|")
892         code = int(result[0])
893         text =  base64.b64decode(result[1])
894         if code == ERROR:
895             raise RuntimeError(text)
896
897     def trace(self, testbed_guid, guid, trace_id):
898         msg = controller_messages[TRACE]
899         msg = msg % (testbed_guid, guid, trace_id)
900         self._client.send_msg(msg)
901         reply = self._client.read_reply()
902         result = reply.split("|")
903         code = int(result[0])
904         text =  base64.b64decode(result[1])
905         if code == OK:
906             return text
907         raise RuntimeError(text)
908
909     def start(self):
910         msg = controller_messages[START]
911         self._client.send_msg(msg)
912         reply = self._client.read_reply()
913         result = reply.split("|")
914         code = int(result[0])
915         text =  base64.b64decode(result[1])
916         if code == ERROR:
917             raise RuntimeError(text)
918
919     def stop(self):
920         msg = controller_messages[STOP]
921         self._client.send_msg(msg)
922         reply = self._client.read_reply()
923         result = reply.split("|")
924         code = int(result[0])
925         text =  base64.b64decode(result[1])
926         if code == ERROR:
927             raise RuntimeError(text)
928
929     def is_finished(self, guid):
930         msg = controller_messages[FINISHED]
931         msg = msg % guid
932         self._client.send_msg(msg)
933         reply = self._client.read_reply()
934         result = reply.split("|")
935         code = int(result[0])
936         text = base64.b64decode(result[1])
937         if code == ERROR:
938             raise RuntimeError(text)
939         return text == "True"
940
941     def shutdown(self):
942         msg = controller_messages[SHUTDOWN]
943         self._client.send_msg(msg)
944         reply = self._client.read_reply()
945         result = reply.split("|")
946         code = int(result[0])
947         text =  base64.b64decode(result[1])
948         if code == ERROR:
949             raise RuntimeError(text)
950         self._client.send_stop()
951