Ticket #12: proxy reconnection
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import sys
10 import time
11
12 # PROTOCOL REPLIES
13 OK = 0
14 ERROR = 1
15
16 # PROTOCOL INSTRUCTION MESSAGES
17 XML = 2 
18 ACCESS  = 3
19 TRACE   = 4
20 FINISHED    = 5
21 START   = 6
22 STOP    = 7
23 SHUTDOWN    = 8
24 CONFIGURE   = 9
25 CREATE      = 10
26 CREATE_SET  = 11
27 FACTORY_SET = 12
28 CONNECT     = 13
29 CROSS_CONNECT   = 14
30 ADD_TRACE   = 15
31 ADD_ADDRESS = 16
32 ADD_ROUTE   = 17
33 DO_SETUP    = 18
34 DO_CREATE   = 19
35 DO_CONNECT  = 20
36 DO_CONFIGURE    = 21
37 DO_CROSS_CONNECT    = 22
38 GET = 23
39 SET = 24
40 ACTION  = 25
41 STATUS  = 26
42 GUIDS  = 27
43
44 # PARAMETER TYPE
45 STRING  =  100
46 INTEGER = 101
47 BOOL    = 102
48 FLOAT   = 103
49
50 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
51 controller_messages = dict({
52     XML:    "%d" % XML,
53     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
54     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s"),
55     FINISHED:   "%d|%s" % (FINISHED, "%d"),
56     START:  "%d" % START,
57     STOP:   "%d" % STOP,
58     SHUTDOWN:   "%d" % SHUTDOWN,
59     })
60
61 # TESTBED INSTANCE PROTOCOL MESSAGES
62 testbed_messages = dict({
63     TRACE:  "%d|%s" % (TRACE, "%d|%s"),
64     START:  "%d" % START,
65     STOP:   "%d" % STOP,
66     SHUTDOWN:   "%d" % SHUTDOWN,
67     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
68     CREATE: "%d|%s" % (CREATE, "%d|%s"),
69     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
70     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
71     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
72     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
73     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
74     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
75     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
76     DO_SETUP:   "%d" % DO_SETUP,
77     DO_CREATE:  "%d" % DO_CREATE,
78     DO_CONNECT: "%d" % DO_CONNECT,
79     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
80     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
81     GET:    "%d|%s" % (GET, "%s|%d|%s"),
82     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
83     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
84     STATUS: "%d|%s" % (STATUS, "%d"),
85     GUIDS:  "%d" % GUIDS,
86     })
87
88 instruction_text = dict({
89     OK:     "OK",
90     ERROR:  "ERROR",
91     XML:    "XML",
92     ACCESS: "ACCESS",
93     TRACE:  "TRACE",
94     FINISHED:   "FINISHED",
95     START:  "START",
96     STOP:   "STOP",
97     SHUTDOWN:   "SHUTDOWN",
98     CONFIGURE:  "CONFIGURE",
99     CREATE: "CREATE",
100     CREATE_SET: "CREATE_SET",
101     FACTORY_SET:    "FACTORY_SET",
102     CONNECT:    "CONNECT",
103     CROSS_CONNECT: "CROSS_CONNECT",
104     ADD_TRACE:  "ADD_TRACE",
105     ADD_ADDRESS:    "ADD_ADDRESS",
106     ADD_ROUTE:  "ADD_ROUTE",
107     DO_SETUP:   "DO_SETUP",
108     DO_CREATE:  "DO_CREATE",
109     DO_CONNECT: "DO_CONNECT",
110     DO_CONFIGURE:   "DO_CONFIGURE",
111     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
112     GET:    "GET",
113     SET:    "SET",
114     ACTION: "ACTION",
115     STATUS: "STATUS",
116     GUIDS:  "GUIDS",
117     STRING: "STRING",
118     INTEGER:    "INTEGER",
119     BOOL:   "BOOL",
120     FLOAT:  "FLOAT"
121     })
122
123 def get_type(value):
124     if isinstance(value, bool):
125         return BOOL
126     elif isinstance(value, int):
127         return INTEGER
128     elif isinstance(value, float):
129         return FLOAT
130     else:
131         return STRING
132
133 def set_type(type, value):
134     if type == INTEGER:
135         value = int(value)
136     elif type == FLOAT:
137         value = float(value)
138     elif type == BOOL:
139         value = value == "True"
140     else:
141         value = str(value)
142     return value
143
144 def log_msg(server, params):
145     instr = int(params[0])
146     instr_txt = instruction_text[instr]
147     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
148         instr_txt, ", ".join(map(str, params[1:]))))
149
150 def log_reply(server, reply):
151     res = reply.split("|")
152     code = int(res[0])
153     code_txt = instruction_text[code]
154     txt = base64.b64decode(res[1])
155     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
156             code_txt, txt))
157
158 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
159     if python_code:
160         # launch daemon
161         proc = server.popen_ssh_subprocess(python_code, host = host,
162             port = port, user = user, agent = agent)
163         if proc.poll():
164             err = proc.stderr.read()
165             raise RuntimeError("Client could not be executed: %s" % \
166                     err)
167     # create client
168     return server.Client(root_dir, host = host, port = port, user = user, 
169             agent = agent)
170
171 def to_server_log_level(log_level):
172     return server.DEBUG_LEVEL \
173             if log_level == AccessConfiguration.DEBUG_LEVEL \
174                 else server.ERROR_LEVEL
175
176 def get_access_config_params(access_config):
177     root_dir = access_config.get_attribute_value("rootDirectory")
178     log_level = access_config.get_attribute_value("logLevel")
179     log_level = to_server_log_level(log_level)
180     user = host = port = agent = None
181     communication = access_config.get_attribute_value("communication")
182     if communication == AccessConfiguration.ACCESS_SSH:
183         user = access_config.get_attribute_value("user")
184         host = access_config.get_attribute_value("host")
185         port = access_config.get_attribute_value("port")
186         agent = access_config.get_attribute_value("useAgent")
187     return (root_dir, log_level, user, host, port, agent)
188
189 class AccessConfiguration(AttributesMap):
190     MODE_SINGLE_PROCESS = "SINGLE"
191     MODE_DAEMON = "DAEMON"
192     ACCESS_SSH = "SSH"
193     ACCESS_LOCAL = "LOCAL"
194     ERROR_LEVEL = "Error"
195     DEBUG_LEVEL = "Debug"
196
197     def __init__(self):
198         super(AccessConfiguration, self).__init__()
199         self.add_attribute(name = "mode",
200                 help = "Instance execution mode",
201                 type = Attribute.ENUM,
202                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
203                 allowed = [AccessConfiguration.MODE_DAEMON,
204                     AccessConfiguration.MODE_SINGLE_PROCESS],
205                 validation_function = validation.is_enum)
206         self.add_attribute(name = "communication",
207                 help = "Instance communication mode",
208                 type = Attribute.ENUM,
209                 value = AccessConfiguration.ACCESS_LOCAL,
210                 allowed = [AccessConfiguration.ACCESS_LOCAL,
211                     AccessConfiguration.ACCESS_SSH],
212                 validation_function = validation.is_enum)
213         self.add_attribute(name = "host",
214                 help = "Host where the testbed will be executed",
215                 type = Attribute.STRING,
216                 value = "localhost",
217                 validation_function = validation.is_string)
218         self.add_attribute(name = "user",
219                 help = "User on the Host to execute the testbed",
220                 type = Attribute.STRING,
221                 value = getpass.getuser(),
222                 validation_function = validation.is_string)
223         self.add_attribute(name = "port",
224                 help = "Port on the Host",
225                 type = Attribute.INTEGER,
226                 value = 22,
227                 validation_function = validation.is_integer)
228         self.add_attribute(name = "rootDirectory",
229                 help = "Root directory for storing process files",
230                 type = Attribute.STRING,
231                 value = ".",
232                 validation_function = validation.is_string) # TODO: validation.is_path
233         self.add_attribute(name = "useAgent",
234                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
235                 type = Attribute.BOOL,
236                 value = False,
237                 validation_function = validation.is_bool)
238         self.add_attribute(name = "logLevel",
239                 help = "Log level for instance",
240                 type = Attribute.ENUM,
241                 value = AccessConfiguration.ERROR_LEVEL,
242                 allowed = [AccessConfiguration.ERROR_LEVEL,
243                     AccessConfiguration.DEBUG_LEVEL],
244                 validation_function = validation.is_enum)
245
246 def create_controller(xml, access_config = None):
247     mode = None if not access_config else \
248             access_config.get_attribute_value("mode")
249     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
250         from nepi.core.execute import ExperimentController
251         return ExperimentController(xml)
252     elif mode == AccessConfiguration.MODE_DAEMON:
253         (root_dir, log_level, user, host, port, agent) = \
254                 get_access_config_params(access_config)
255         return ExperimentControllerProxy(root_dir, log_level,
256                 experiment_xml = xml, host = host, port = port, user = user, 
257                 agent = agent)
258     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
259
260 def create_testbed_instance(testbed_id, testbed_version, access_config):
261     mode = None if not access_config else access_config.get_attribute_value("mode")
262     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
263         return  _build_testbed_instance(testbed_id, testbed_version)
264     elif mode == AccessConfiguration.MODE_DAEMON:
265         (root_dir, log_level, user, host, port, agent) = \
266                 get_access_config_params(access_config)
267         return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id, 
268                 testbed_version = testbed_version, host = host, port = port,
269                 user = user, agent = agent)
270     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
271
272 def _build_testbed_instance(testbed_id, testbed_version):
273     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
274     if not mod_name in sys.modules:
275         __import__(mod_name)
276     module = sys.modules[mod_name]
277     return module.TestbedInstance(testbed_version)
278
279 class TestbedInstanceServer(server.Server):
280     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
281         super(TestbedInstanceServer, self).__init__(root_dir, log_level)
282         self._testbed_id = testbed_id
283         self._testbed_version = testbed_version
284         self._testbed = None
285
286     def post_daemonize(self):
287         self._testbed = _build_testbed_instance(self._testbed_id, 
288                 self._testbed_version)
289
290     def reply_action(self, msg):
291         params = msg.split("|")
292         instruction = int(params[0])
293         log_msg(self, params)
294         try:
295             if instruction == TRACE:
296                 reply = self.trace(params)
297             elif instruction == START:
298                 reply = self.start(params)
299             elif instruction == STOP:
300                 reply = self.stop(params)
301             elif instruction == SHUTDOWN:
302                 reply = self.shutdown(params)
303             elif instruction == CONFIGURE:
304                 reply = self.defer_configure(params)
305             elif instruction == CREATE:
306                 reply = self.defer_create(params)
307             elif instruction == CREATE_SET:
308                 reply = self.defer_create_set(params)
309             elif instruction == FACTORY_SET:
310                 reply = self.defer_factory_set(params)
311             elif instruction == CONNECT:
312                 reply = self.defer_connect(params)
313             elif instruction == CROSS_CONNECT:
314                 reply = self.defer_cross_connect(params)
315             elif instruction == ADD_TRACE:
316                 reply = self.defer_add_trace(params)
317             elif instruction == ADD_ADDRESS:
318                 reply = self.defer_add_address(params)
319             elif instruction == ADD_ROUTE:
320                 reply = self.defer_add_route(params)
321             elif instruction == DO_SETUP:
322                 reply = self.do_setup(params)
323             elif instruction == DO_CREATE:
324                 reply = self.do_create(params)
325             elif instruction == DO_CONNECT:
326                 reply = self.do_connect(params)
327             elif instruction == DO_CONFIGURE:
328                 reply = self.do_configure(params)
329             elif instruction == DO_CROSS_CONNECT:
330                 reply = self.do_cross_connect(params)
331             elif instruction == GET:
332                 reply = self.get(params)
333             elif instruction == SET:
334                 reply = self.set(params)
335             elif instruction == ACTION:
336                 reply = self.action(params)
337             elif instruction == STATUS:
338                 reply = self.status(params)
339             elif instruction == GUIDS:
340                 reply = self.guids(params)
341             else:
342                 error = "Invalid instruction %s" % instruction
343                 self.log_error(error)
344                 result = base64.b64encode(error)
345                 reply = "%d|%s" % (ERROR, result)
346         except:
347             error = self.log_error()
348             result = base64.b64encode(error)
349             reply = "%d|%s" % (ERROR, result)
350         log_reply(self, reply)
351         return reply
352
353     def guids(self, params):
354         guids = self._testbed.guids
355         guids = ",".join(map(str, guids))
356         result = base64.b64encode(guids)
357         return "%d|%s" % (OK, result)
358
359     def defer_create(self, params):
360         guid = int(params[1])
361         factory_id = params[2]
362         self._testbed.defer_create(guid, factory_id)
363         return "%d|%s" % (OK, "")
364
365     def trace(self, params):
366         guid = int(params[1])
367         trace_id = params[2]
368         trace = self._testbed.trace(guid, trace_id)
369         result = base64.b64encode(trace)
370         return "%d|%s" % (OK, result)
371
372     def start(self, params):
373         self._testbed.start()
374         return "%d|%s" % (OK, "")
375
376     def stop(self, params):
377         self._testbed.stop()
378         return "%d|%s" % (OK, "")
379
380     def shutdown(self, params):
381         self._testbed.shutdown()
382         return "%d|%s" % (OK, "")
383
384     def defer_configure(self, params):
385         name = base64.b64decode(params[1])
386         value = base64.b64decode(params[2])
387         type = int(params[3])
388         value = set_type(type, value)
389         self._testbed.defer_configure(name, value)
390         return "%d|%s" % (OK, "")
391
392     def defer_create_set(self, params):
393         guid = int(params[1])
394         name = base64.b64decode(params[2])
395         value = base64.b64decode(params[3])
396         type = int(params[4])
397         value = set_type(type, value)
398         self._testbed.defer_create_set(guid, name, value)
399         return "%d|%s" % (OK, "")
400
401     def defer_factory_set(self, params):
402         name = base64.b64decode(params[1])
403         value = base64.b64decode(params[2])
404         type = int(params[3])
405         value = set_type(type, value)
406         self._testbed.defer_factory_set(name, value)
407         return "%d|%s" % (OK, "")
408
409     def defer_connect(self, params):
410         guid1 = int(params[1])
411         connector_type_name1 = params[2]
412         guid2 = int(params[3])
413         connector_type_name2 = params[4]
414         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
415             connector_type_name2)
416         return "%d|%s" % (OK, "")
417
418     def defer_cross_connect(self, params):
419         guid = int(params[1])
420         connector_type_name = params[2]
421         cross_guid = int(params[3])
422         connector_type_name = params[4]
423         cross_guid = int(params[5])
424         cross_testbed_id = params[6]
425         cross_factory_id = params[7]
426         cross_connector_type_name = params[8]
427         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
428             cross_testbed_id, cross_factory_id, cross_connector_type_name)
429         return "%d|%s" % (OK, "")
430
431     def defer_add_trace(self, params):
432         guid = int(params[1])
433         trace_id = params[2]
434         self._testbed.defer_add_trace(guid, trace_id)
435         return "%d|%s" % (OK, "")
436
437     def defer_add_address(self, params):
438         guid = int(params[1])
439         address = params[2]
440         netprefix = int(params[3])
441         broadcast = params[4]
442         self._testbed.defer_add_address(guid, address, netprefix,
443                 broadcast)
444         return "%d|%s" % (OK, "")
445
446     def defer_add_route(self, params):
447         guid = int(params[1])
448         destination = params[2]
449         netprefix = int(params[3])
450         nexthop = params[4]
451         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
452         return "%d|%s" % (OK, "")
453
454     def do_setup(self, params):
455         self._testbed.do_setup()
456         return "%d|%s" % (OK, "")
457
458     def do_create(self, params):
459         self._testbed.do_create()
460         return "%d|%s" % (OK, "")
461
462     def do_connect(self, params):
463         self._testbed.do_connect()
464         return "%d|%s" % (OK, "")
465
466     def do_configure(self, params):
467         self._testbed.do_configure()
468         return "%d|%s" % (OK, "")
469
470     def do_cross_connect(self, params):
471         self._testbed.do_cross_connect()
472         return "%d|%s" % (OK, "")
473
474     def get(self, params):
475         time = params[1]
476         guid = int(param[2] )
477         name = base64.b64decode(params[3])
478         value = self._testbed.get(time, guid, name)
479         result = base64.b64encode(str(value))
480         return "%d|%s" % (OK, result)
481
482     def set(self, params):
483         time = params[1]
484         guid = int(params[2])
485         name = base64.b64decode(params[3])
486         value = base64.b64decode(params[4])
487         type = int(params[3])
488         value = set_type(type, value)
489         self._testbed.set(time, guid, name, value)
490         return "%d|%s" % (OK, "")
491
492     def action(self, params):
493         time = params[1]
494         guid = int(params[2])
495         command = base64.b64decode(params[3])
496         self._testbed.action(time, guid, command)
497         return "%d|%s" % (OK, "")
498
499     def status(self, params):
500         guid = int(params[1])
501         status = self._testbed.status(guid)
502         result = base64.b64encode(str(status))
503         return "%d|%s" % (OK, result)
504  
505 class ExperimentControllerServer(server.Server):
506     def __init__(self, root_dir, log_level, experiment_xml):
507         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
508         self._experiment_xml = experiment_xml
509         self._controller = None
510
511     def post_daemonize(self):
512         from nepi.core.execute import ExperimentController
513         self._controller = ExperimentController(self._experiment_xml)
514
515     def reply_action(self, msg):
516         params = msg.split("|")
517         instruction = int(params[0])
518         log_msg(self, params)
519         try:
520             if instruction == XML:
521                 reply = self.experiment_xml(params)
522             elif instruction == ACCESS:
523                 reply = self.set_access_configuration(params)
524             elif instruction == TRACE:
525                 reply = self.trace(params)
526             elif instruction == FINISHED:
527                 reply = self.is_finished(params)
528             elif instruction == START:
529                 reply = self.start(params)
530             elif instruction == STOP:
531                 reply = self.stop(params)
532             elif instruction == SHUTDOWN:
533                 reply = self.shutdown(params)
534             else:
535                 error = "Invalid instruction %s" % instruction
536                 self.log_error(error)
537                 result = base64.b64encode(error)
538                 reply = "%d|%s" % (ERROR, result)
539         except:
540             error = self.log_error()
541             result = base64.b64encode(error)
542             reply = "%d|%s" % (ERROR, result)
543         log_reply(self, reply)
544         return reply
545
546     def experiment_xml(self, params):
547         xml = self._controller.experiment_xml
548         result = base64.b64encode(xml)
549         return "%d|%s" % (OK, result)
550
551     def set_access_configuration(self, params):
552         testbed_guid = int(params[1])
553         mode = params[2]
554         communication = params[3]
555         host = params[4]
556         user = params[5]
557         port = int(params[6])
558         root_dir = params[7]
559         use_agent = params[8] == "True"
560         log_level = params[9]
561         access_config = AccessConfiguration()
562         access_config.set_attribute_value("mode", mode)
563         access_config.set_attribute_value("communication", communication)
564         access_config.set_attribute_value("host", host)
565         access_config.set_attribute_value("user", user)
566         access_config.set_attribute_value("port", port)
567         access_config.set_attribute_value("rootDirectory", root_dir)
568         access_config.set_attribute_value("useAgent", use_agent)
569         access_config.set_attribute_value("logLevel", log_level)
570         self._controller.set_access_configuration(testbed_guid, 
571                 access_config)
572         return "%d|%s" % (OK, "")
573
574     def trace(self, params):
575         testbed_guid = int(params[1])
576         guid = int(params[2])
577         trace_id = params[3]
578         trace = self._controller.trace(testbed_guid, guid, trace_id)
579         result = base64.b64encode(trace)
580         return "%d|%s" % (OK, result)
581
582     def is_finished(self, params):
583         guid = int(params[1])
584         status = self._controller.is_finished(guid)
585         result = base64.b64encode(str(status))
586         return "%d|%s" % (OK, result)
587
588     def start(self, params):
589         self._controller.start()
590         return "%d|%s" % (OK, "")
591
592     def stop(self, params):
593         self._controller.stop()
594         return "%d|%s" % (OK, "")
595
596     def shutdown(self, params):
597         self._controller.shutdown()
598         return "%d|%s" % (OK, "")
599
600 class TestbedInstanceProxy(object):
601     def __init__(self, root_dir, log_level, testbed_id = None, 
602             testbed_version = None, launch = True, host = None, 
603             port = None, user = None, agent = None):
604         if launch:
605             if testbed_id == None or testbed_version == None:
606                 raise RuntimeError("To launch a TesbedInstance server a \
607                         testbed_id and testbed_version are required")
608             # ssh
609             if host != None:
610                 python_code = "from nepi.util.proxy import \
611                         TesbedInstanceServer;\
612                         s = TestbedInstanceServer('%s', %d, '%s', '%s');\
613                         s.run()" % (root_dir, log_level, testbed_id, 
614                                 testbed_version)
615                 self._client = launch_ssh_daemon_client(root_dir, python_code,
616                         host, port, user, agent)
617             else:
618                 # launch daemon
619                 s = TestbedInstanceServer(root_dir, log_level, testbed_id, 
620                     testbed_version)
621                 s.run()
622                 # create client
623                 self._client = server.Client(root_dir)
624         else:
625             # attempt to reconnect
626             if host != None:
627                 self._client = launch_ssh_daemon_client(root_dir, None,
628                         host, port, user, agent)
629             else:
630                 self._client = server.Client(root_dir)
631
632     @property
633     def guids(self):
634         msg = testbed_messages[GUIDS]
635         self._client.send_msg(msg)
636         reply = self._client.read_reply()
637         result = reply.split("|")
638         code = int(result[0])
639         text = base64.b64decode(result[1])
640         if code == ERROR:
641             raise RuntimeError(text)
642         return map(int, text.split(","))
643
644     def defer_configure(self, name, value):
645         msg = testbed_messages[CONFIGURE]
646         type = get_type(value)
647         # avoid having "|" in this parameters
648         name = base64.b64encode(name)
649         value = base64.b64encode(str(value))
650         msg = msg % (name, value, type)
651         self._client.send_msg(msg)
652         reply = self._client.read_reply()
653         result = reply.split("|")
654         code = int(result[0])
655         text = base64.b64decode(result[1])
656         if code == ERROR:
657             raise RuntimeError(text)
658
659     def defer_create(self, guid, factory_id):
660         msg = testbed_messages[CREATE]
661         msg = msg % (guid, factory_id)
662         self._client.send_msg(msg)
663         reply = self._client.read_reply()
664         result = reply.split("|")
665         code = int(result[0])
666         text = base64.b64decode(result[1])
667         if code == ERROR:
668             raise RuntimeError(text)
669
670     def defer_create_set(self, guid, name, value):
671         msg = testbed_messages[CREATE_SET]
672         type = get_type(value)
673         # avoid having "|" in this parameters
674         name = base64.b64encode(name)
675         value = base64.b64encode(str(value))
676         msg = msg % (guid, name, value, type)
677         self._client.send_msg(msg)
678         reply = self._client.read_reply()
679         result = reply.split("|")
680         code = int(result[0])
681         text = base64.b64decode(result[1])
682         if code == ERROR:
683             raise RuntimeError(text)
684
685     def defer_factory_set(self, guid, name, value):
686         msg = testbed_messages[FACTORY_SET]
687         type = get_type(value)
688         # avoid having "|" in this parameters
689         name = base64.b64encode(name)
690         value = base64.b64encode(str(value))
691         msg = msg % (guid, name, value, type)
692         self._client.send_msg(msg)
693         reply = self._client.read_reply()
694         result = reply.split("|")
695         code = int(result[0])
696         text = base64.b64decode(result[1])
697         if code == ERROR:
698             raise RuntimeError(text)
699
700     def defer_connect(self, guid1, connector_type_name1, guid2, 
701             connector_type_name2): 
702         msg = testbed_messages[CONNECT]
703         msg = msg % (guid1, connector_type_name1, guid2, 
704             connector_type_name2)
705         self._client.send_msg(msg)
706         reply = self._client.read_reply()
707         result = reply.split("|")
708         code = int(result[0])
709         text = base64.b64decode(result[1])
710         if code == ERROR:
711             raise RuntimeError(text)
712
713     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
714             cross_testbed_id, cross_factory_id, cross_connector_type_name):
715         msg = testbed_messages[CROSS_CONNECT]
716         msg = msg % (guid, connector_type_name, cross_guid, 
717             cross_testbed_id, cross_factory_id, cross_connector_type_name)
718         self._client.send_msg(msg)
719         reply = self._client.read_reply()
720         result = reply.split("|")
721         code = int(result[0])
722         text = base64.b64decode(result[1])
723         if code == ERROR:
724             raise RuntimeError(text)
725
726     def defer_add_trace(self, guid, trace_id):
727         msg = testbed_messages[ADD_TRACE]
728         msg = msg % (guid, trace_id)
729         self._client.send_msg(msg)
730         reply = self._client.read_reply()
731         result = reply.split("|")
732         code = int(result[0])
733         text = base64.b64decode(result[1])
734         if code == ERROR:
735             raise RuntimeError(text)
736
737     def defer_add_address(self, guid, address, netprefix, broadcast): 
738         msg = testbed_messages[ADD_ADDRESS]
739         msg = msg % (guid, address, netprefix, broadcast)
740         self._client.send_msg(msg)
741         reply = self._client.read_reply()
742         result = reply.split("|")
743         code = int(result[0])
744         text = base64.b64decode(result[1])
745         if code == ERROR:
746             raise RuntimeError(text)
747
748     def defer_add_route(self, guid, destination, netprefix, nexthop):
749         msg = testbed_messages[ADD_ROUTE]
750         msg = msg % (guid, destination, netprefix, nexthop)
751         self._client.send_msg(msg)
752         reply = self._client.read_reply()
753         result = reply.split("|")
754         code = int(result[0])
755         text = base64.b64decode(result[1])
756         if code == ERROR:
757             raise RuntimeError(text)
758
759     def do_setup(self):
760         msg = testbed_messages[DO_SETUP]
761         self._client.send_msg(msg)
762         reply = self._client.read_reply()
763         result = reply.split("|")
764         code = int(result[0])
765         text = base64.b64decode(result[1])
766         if code == ERROR:
767             raise RuntimeError(text)
768
769     def do_create(self):
770         msg = testbed_messages[DO_CREATE]
771         self._client.send_msg(msg)
772         reply = self._client.read_reply()
773         result = reply.split("|")
774         code = int(result[0])
775         text = base64.b64decode(result[1])
776         if code == ERROR:
777             raise RuntimeError(text)
778
779     def do_connect(self):
780         msg = testbed_messages[DO_CONNECT]
781         self._client.send_msg(msg)
782         reply = self._client.read_reply()
783         result = reply.split("|")
784         code = int(result[0])
785         text = base64.b64decode(result[1])
786         if code == ERROR:
787             raise RuntimeError(text)
788
789     def do_configure(self):
790         msg = testbed_messages[DO_CONFIGURE]
791         self._client.send_msg(msg)
792         reply = self._client.read_reply()
793         result = reply.split("|")
794         code = int(result[0])
795         text = base64.b64decode(result[1])
796         if code == ERROR:
797             raise RuntimeError(text)
798
799     def do_cross_connect(self):
800         msg = testbed_messages[DO_CROSS_CONNECT]
801         self._client.send_msg(msg)
802         reply = self._client.read_reply()
803         result = reply.split("|")
804         code = int(result[0])
805         text = base64.b64decode(result[1])
806         if code == ERROR:
807             raise RuntimeError(text)
808
809     def start(self, time = TIME_NOW):
810         msg = testbed_messages[START]
811         self._client.send_msg(msg)
812         reply = self._client.read_reply()
813         result = reply.split("|")
814         code = int(result[0])
815         text = base64.b64decode(result[1])
816         if code == ERROR:
817             raise RuntimeError(text)
818
819     def stop(self, time = TIME_NOW):
820         msg = testbed_messages[STOP]
821         self._client.send_msg(msg)
822         reply = self._client.read_reply()
823         result = reply.split("|")
824         code = int(result[0])
825         text = base64.b64decode(result[1])
826         if code == ERROR:
827             raise RuntimeError(text)
828
829     def set(self, time, guid, name, value):
830         msg = testbed_messages[SET]
831         type = get_type(value)
832         # avoid having "|" in this parameters
833         name = base64.b64encode(name)
834         value = base64.b64encode(str(value))
835         msg = msg % (time, guid, name, value, type)
836         self._client.send_msg(msg)
837         reply = self._client.read_reply()
838         result = reply.split("|")
839         code = int(result[0])
840         text = base64.b64decode(result[1])
841         if code == ERROR:
842             raise RuntimeError(text)
843
844     def get(self, time, guid, name):
845         msg = testbed_messages[GET]
846         # avoid having "|" in this parameters
847         name = base64.b64encode(name)
848         msg = msg % (time, guid, name)
849         self._client.send_msg(msg)
850         reply = self._client.read_reply()
851         result = reply.split("|")
852         code = int(result[0])
853         text = base64.b64decode(result[1])
854         if code == ERROR:
855             raise RuntimeError(text)
856         return text
857
858     def action(self, time, guid, action):
859         msg = testbed_messages[ACTION]
860         msg = msg % (time, guid, action)
861         self._client.send_msg(msg)
862         reply = self._client.read_reply()
863         result = reply.split("|")
864         code = int(result[0])
865         text = base64.b64decode(result[1])
866         if code == ERROR:
867             raise RuntimeError(text)
868
869     def status(self, guid):
870         msg = testbed_messages[STATUS]
871         msg = msg % (guid)
872         self._client.send_msg(msg)
873         reply = self._client.read_reply()
874         result = reply.split("|")
875         code = int(result[0])
876         text = base64.b64decode(result[1])
877         if code == ERROR:
878             raise RuntimeError(text)
879         return int(text)
880
881     def trace(self, guid, trace_id):
882         msg = testbed_messages[TRACE]
883         msg = msg % (guid, trace_id)
884         self._client.send_msg(msg)
885         reply = self._client.read_reply()
886         result = reply.split("|")
887         code = int(result[0])
888         text = base64.b64decode(result[1])
889         if code == ERROR:
890             raise RuntimeError(text)
891         return text
892
893     def shutdown(self):
894         msg = testbed_messages[SHUTDOWN]
895         self._client.send_msg(msg)
896         reply = self._client.read_reply()
897         result = reply.split("|")
898         code = int(result[0])
899         text = base64.b64decode(result[1])
900         if code == ERROR:
901             raise RuntimeError(text)
902         self._client.send_stop()
903
904 class ExperimentControllerProxy(object):
905     def __init__(self, root_dir, log_level, experiment_xml = None, 
906             launch = True, host = None, port = None, user = None, 
907             agent = None):
908         if launch:
909             # launch server
910             if experiment_xml == None:
911                 raise RuntimeError("To launch a ExperimentControllerServer a \
912                         xml description of the experiment is required")
913             # ssh
914             if host != None:
915                 xml = experiment_xml
916                 xml = xml.replace("'", r"\'")
917                 xml = xml.replace("\"", r"\'")
918                 xml = xml.replace("\n", r"")
919                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
920                         s = ExperimentControllerServer(%r, %r, %r);\
921                         s.run()" % (root_dir, log_level, xml)
922                 self._client = launch_ssh_daemon_client(root_dir, python_code,
923                         host, port, user, agent)
924             else:
925                 # launch daemon
926                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
927                 s.run()
928                 # create client
929                 self._client = server.Client(root_dir)
930         else:
931             # attempt to reconnect
932             if host != None:
933                 self._client = launch_ssh_daemon_client(root_dir, None,
934                         host, port, user, agent)
935             else:
936                 self._client = server.Client(root_dir)
937
938     @property
939     def experiment_xml(self):
940         msg = controller_messages[XML]
941         self._client.send_msg(msg)
942         reply = self._client.read_reply()
943         result = reply.split("|")
944         code = int(result[0])
945         text = base64.b64decode(result[1])
946         if code == ERROR:
947             raise RuntimeError(text)
948         return text
949
950     def set_access_configuration(self, testbed_guid, access_config):
951         mode = access_config.get_attribute_value("mode")
952         communication = access_config.get_attribute_value("communication")
953         host = access_config.get_attribute_value("host")
954         user = access_config.get_attribute_value("user")
955         port = access_config.get_attribute_value("port")
956         root_dir = access_config.get_attribute_value("rootDirectory")
957         use_agent = access_config.get_attribute_value("useAgent")
958         log_level = access_config.get_attribute_value("logLevel")
959         msg = controller_messages[ACCESS]
960         msg = msg % (testbed_guid, mode, communication, host, user, port, 
961                 root_dir, use_agent, log_level)
962         self._client.send_msg(msg)
963         reply = self._client.read_reply()
964         result = reply.split("|")
965         code = int(result[0])
966         text =  base64.b64decode(result[1])
967         if code == ERROR:
968             raise RuntimeError(text)
969
970     def trace(self, testbed_guid, guid, trace_id):
971         msg = controller_messages[TRACE]
972         msg = msg % (testbed_guid, guid, trace_id)
973         self._client.send_msg(msg)
974         reply = self._client.read_reply()
975         result = reply.split("|")
976         code = int(result[0])
977         text =  base64.b64decode(result[1])
978         if code == OK:
979             return text
980         raise RuntimeError(text)
981
982     def start(self):
983         msg = controller_messages[START]
984         self._client.send_msg(msg)
985         reply = self._client.read_reply()
986         result = reply.split("|")
987         code = int(result[0])
988         text =  base64.b64decode(result[1])
989         if code == ERROR:
990             raise RuntimeError(text)
991
992     def stop(self):
993         msg = controller_messages[STOP]
994         self._client.send_msg(msg)
995         reply = self._client.read_reply()
996         result = reply.split("|")
997         code = int(result[0])
998         text =  base64.b64decode(result[1])
999         if code == ERROR:
1000             raise RuntimeError(text)
1001
1002     def is_finished(self, guid):
1003         msg = controller_messages[FINISHED]
1004         msg = msg % guid
1005         self._client.send_msg(msg)
1006         reply = self._client.read_reply()
1007         result = reply.split("|")
1008         code = int(result[0])
1009         text = base64.b64decode(result[1])
1010         if code == ERROR:
1011             raise RuntimeError(text)
1012         return text == "True"
1013
1014     def shutdown(self):
1015         msg = controller_messages[SHUTDOWN]
1016         self._client.send_msg(msg)
1017         reply = self._client.read_reply()
1018         result = reply.split("|")
1019         code = int(result[0])
1020         text =  base64.b64decode(result[1])
1021         if code == ERROR:
1022             raise RuntimeError(text)
1023         self._client.send_stop()
1024