a83f2b827a6ddacb76175d9b7795ecfe9ff4871f
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import sys
10 import time
11
12 # PROTOCOL REPLIES
13 OK = 0
14 ERROR = 1
15
16 # PROTOCOL INSTRUCTION MESSAGES
17 XML = 2 
18 ACCESS  = 3
19 TRACE   = 4
20 FINISHED    = 5
21 START   = 6
22 STOP    = 7
23 SHUTDOWN    = 8
24 CONFIGURE   = 9
25 CREATE      = 10
26 CREATE_SET  = 11
27 FACTORY_SET = 12
28 CONNECT     = 13
29 CROSS_CONNECT   = 14
30 ADD_TRACE   = 15
31 ADD_ADDRESS = 16
32 ADD_ROUTE   = 17
33 DO_SETUP    = 18
34 DO_CREATE   = 19
35 DO_CONNECT  = 20
36 DO_CONFIGURE    = 21
37 DO_CROSS_CONNECT    = 22
38 GET = 23
39 SET = 24
40 ACTION  = 25
41 STATUS  = 26
42 GUIDS  = 27
43
44 # PARAMETER TYPE
45 STRING  =  100
46 INTEGER = 101
47 BOOL    = 102
48 FLOAT   = 103
49
50 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
51 controller_messages = dict({
52     XML:    "%d" % XML,
53     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
54     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s"),
55     FINISHED:   "%d|%s" % (FINISHED, "%d"),
56     START:  "%d" % START,
57     STOP:   "%d" % STOP,
58     SHUTDOWN:   "%d" % SHUTDOWN,
59     })
60
61 # TESTBED INSTANCE PROTOCOL MESSAGES
62 testbed_messages = dict({
63     TRACE:  "%d|%s" % (TRACE, "%d|%s"),
64     START:  "%d" % START,
65     STOP:   "%d" % STOP,
66     SHUTDOWN:   "%d" % SHUTDOWN,
67     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
68     CREATE: "%d|%s" % (CREATE, "%d|%s"),
69     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
70     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
71     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
72     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
73     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
74     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
75     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
76     DO_SETUP:   "%d" % DO_SETUP,
77     DO_CREATE:  "%d" % DO_CREATE,
78     DO_CONNECT: "%d" % DO_CONNECT,
79     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
80     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
81     GET:    "%d|%s" % (GET, "%s|%d|%s"),
82     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
83     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
84     STATUS: "%d|%s" % (STATUS, "%d"),
85     GUIDS:  "%d" % GUIDS,
86     })
87
88 instruction_text = dict({
89     OK:     "OK",
90     ERROR:  "ERROR",
91     XML:    "XML",
92     ACCESS: "ACCESS",
93     TRACE:  "TRACE",
94     FINISHED:   "FINISHED",
95     START:  "START",
96     STOP:   "STOP",
97     SHUTDOWN:   "SHUTDOWN",
98     CONFIGURE:  "CONFIGURE",
99     CREATE: "CREATE",
100     CREATE_SET: "CREATE_SET",
101     FACTORY_SET:    "FACTORY_SET",
102     CONNECT:    "CONNECT",
103     CROSS_CONNECT: "CROSS_CONNECT",
104     ADD_TRACE:  "ADD_TRACE",
105     ADD_ADDRESS:    "ADD_ADDRESS",
106     ADD_ROUTE:  "ADD_ROUTE",
107     DO_SETUP:   "DO_SETUP",
108     DO_CREATE:  "DO_CREATE",
109     DO_CONNECT: "DO_CONNECT",
110     DO_CONFIGURE:   "DO_CONFIGURE",
111     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
112     GET:    "GET",
113     SET:    "SET",
114     ACTION: "ACTION",
115     STATUS: "STATUS",
116     GUIDS:  "GUIDS",
117     STRING: "STRING",
118     INTEGER:    "INTEGER",
119     BOOL:   "BOOL",
120     FLOAT:  "FLOAT"
121     })
122
123 def get_type(value):
124     if isinstance(value, bool):
125         return BOOL
126     elif isinstance(value, int):
127         return INTEGER
128     elif isinstance(value, float):
129         return FLOAT
130     else:
131         return STRING
132
133 def set_type(type, value):
134     if type == INTEGER:
135         value = int(value)
136     elif type == FLOAT:
137         value = float(value)
138     elif type == BOOL:
139         value = value == "True"
140     else:
141         value = str(value)
142     return value
143
144 def log_msg(server, params):
145     instr = int(params[0])
146     instr_txt = instruction_text[instr]
147     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
148         instr_txt, ", ".join(map(str, params[1:]))))
149
150 def log_reply(server, reply):
151     res = reply.split("|")
152     code = int(res[0])
153     code_txt = instruction_text[code]
154     txt = base64.b64decode(res[1])
155     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
156             code_txt, txt))
157
158 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
159     # launch daemon
160     proc = server.popen_ssh_subprocess(python_code, host = host,
161         port = port, user = user, agent = agent)
162     if proc.poll():
163         err = proc.stderr.read()
164         raise RuntimeError("Client could not be executed: %s" % \
165                 err)
166     # create client
167     return server.Client(root_dir, host = host, port = port, user = user, 
168             agent = agent)
169
170 def to_server_log_level(log_level):
171     return server.DEBUG_LEVEL \
172             if log_level == AccessConfiguration.DEBUG_LEVEL \
173                 else server.ERROR_LEVEL
174
175 def get_access_config_params(access_config):
176     root_dir = access_config.get_attribute_value("rootDirectory")
177     log_level = access_config.get_attribute_value("logLevel")
178     log_level = to_server_log_level(log_level)
179     user = host = port = agent = None
180     communication = access_config.get_attribute_value("communication")
181     if communication == AccessConfiguration.ACCESS_SSH:
182         user = access_config.get_attribute_value("user")
183         host = access_config.get_attribute_value("host")
184         port = access_config.get_attribute_value("port")
185         agent = access_config.get_attribute_value("useAgent")
186     return (root_dir, log_level, user, host, port, agent)
187
188 class AccessConfiguration(AttributesMap):
189     MODE_SINGLE_PROCESS = "SINGLE"
190     MODE_DAEMON = "DAEMON"
191     ACCESS_SSH = "SSH"
192     ACCESS_LOCAL = "LOCAL"
193     ERROR_LEVEL = "Error"
194     DEBUG_LEVEL = "Debug"
195
196     def __init__(self):
197         super(AccessConfiguration, self).__init__()
198         self.add_attribute(name = "mode",
199                 help = "Instance execution mode",
200                 type = Attribute.ENUM,
201                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
202                 allowed = [AccessConfiguration.MODE_DAEMON,
203                     AccessConfiguration.MODE_SINGLE_PROCESS],
204                 validation_function = validation.is_enum)
205         self.add_attribute(name = "communication",
206                 help = "Instance communication mode",
207                 type = Attribute.ENUM,
208                 value = AccessConfiguration.ACCESS_LOCAL,
209                 allowed = [AccessConfiguration.ACCESS_LOCAL,
210                     AccessConfiguration.ACCESS_SSH],
211                 validation_function = validation.is_enum)
212         self.add_attribute(name = "host",
213                 help = "Host where the testbed will be executed",
214                 type = Attribute.STRING,
215                 value = "localhost",
216                 validation_function = validation.is_string)
217         self.add_attribute(name = "user",
218                 help = "User on the Host to execute the testbed",
219                 type = Attribute.STRING,
220                 value = getpass.getuser(),
221                 validation_function = validation.is_string)
222         self.add_attribute(name = "port",
223                 help = "Port on the Host",
224                 type = Attribute.INTEGER,
225                 value = 22,
226                 validation_function = validation.is_integer)
227         self.add_attribute(name = "rootDirectory",
228                 help = "Root directory for storing process files",
229                 type = Attribute.STRING,
230                 value = ".",
231                 validation_function = validation.is_string) # TODO: validation.is_path
232         self.add_attribute(name = "useAgent",
233                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
234                 type = Attribute.BOOL,
235                 value = False,
236                 validation_function = validation.is_bool)
237         self.add_attribute(name = "logLevel",
238                 help = "Log level for instance",
239                 type = Attribute.ENUM,
240                 value = AccessConfiguration.ERROR_LEVEL,
241                 allowed = [AccessConfiguration.ERROR_LEVEL,
242                     AccessConfiguration.DEBUG_LEVEL],
243                 validation_function = validation.is_enum)
244
245 def create_controller(xml, access_config = None):
246     mode = None if not access_config else \
247             access_config.get_attribute_value("mode")
248     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
249         from nepi.core.execute import ExperimentController
250         return ExperimentController(xml)
251     elif mode == AccessConfiguration.MODE_DAEMON:
252         (root_dir, log_level, user, host, port, agent) = \
253                 get_access_config_params(access_config)
254         return ExperimentControllerProxy(root_dir, log_level,
255                 experiment_xml = xml, host = host, port = port, user = user, 
256                 agent = agent)
257     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
258
259 def create_testbed_instance(testbed_id, testbed_version, access_config):
260     mode = None if not access_config else access_config.get_attribute_value("mode")
261     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
262         return  _build_testbed_instance(testbed_id, testbed_version)
263     elif mode == AccessConfiguration.MODE_DAEMON:
264         (root_dir, log_level, user, host, port, agent) = \
265                 get_access_config_params(access_config)
266         return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id, 
267                 testbed_version = testbed_version, host = host, port = port,
268                 user = user, agent = agent)
269     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
270
271 def _build_testbed_instance(testbed_id, testbed_version):
272     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
273     if not mod_name in sys.modules:
274         __import__(mod_name)
275     module = sys.modules[mod_name]
276     return module.TestbedInstance(testbed_version)
277
278 class TestbedInstanceServer(server.Server):
279     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
280         super(TestbedInstanceServer, self).__init__(root_dir, log_level)
281         self._testbed_id = testbed_id
282         self._testbed_version = testbed_version
283         self._testbed = None
284
285     def post_daemonize(self):
286         self._testbed = _build_testbed_instance(self._testbed_id, 
287                 self._testbed_version)
288
289     def reply_action(self, msg):
290         params = msg.split("|")
291         instruction = int(params[0])
292         log_msg(self, params)
293         try:
294             if instruction == TRACE:
295                 reply = self.trace(params)
296             elif instruction == START:
297                 reply = self.start(params)
298             elif instruction == STOP:
299                 reply = self.stop(params)
300             elif instruction == SHUTDOWN:
301                 reply = self.shutdown(params)
302             elif instruction == CONFIGURE:
303                 reply = self.defer_configure(params)
304             elif instruction == CREATE:
305                 reply = self.defer_create(params)
306             elif instruction == CREATE_SET:
307                 reply = self.defer_create_set(params)
308             elif instruction == FACTORY_SET:
309                 reply = self.defer_factory_set(params)
310             elif instruction == CONNECT:
311                 reply = self.defer_connect(params)
312             elif instruction == CROSS_CONNECT:
313                 reply = self.defer_cross_connect(params)
314             elif instruction == ADD_TRACE:
315                 reply = self.defer_add_trace(params)
316             elif instruction == ADD_ADDRESS:
317                 reply = self.defer_add_address(params)
318             elif instruction == ADD_ROUTE:
319                 reply = self.defer_add_route(params)
320             elif instruction == DO_SETUP:
321                 reply = self.do_setup(params)
322             elif instruction == DO_CREATE:
323                 reply = self.do_create(params)
324             elif instruction == DO_CONNECT:
325                 reply = self.do_connect(params)
326             elif instruction == DO_CONFIGURE:
327                 reply = self.do_configure(params)
328             elif instruction == DO_CROSS_CONNECT:
329                 reply = self.do_cross_connect(params)
330             elif instruction == GET:
331                 reply = self.get(params)
332             elif instruction == SET:
333                 reply = self.set(params)
334             elif instruction == ACTION:
335                 reply = self.action(params)
336             elif instruction == STATUS:
337                 reply = self.status(params)
338             elif instruction == GUIDS:
339                 reply = self.guids(params)
340             else:
341                 error = "Invalid instruction %s" % instruction
342                 self.log_error(error)
343                 result = base64.b64encode(error)
344                 reply = "%d|%s" % (ERROR, result)
345         except:
346             error = self.log_error()
347             result = base64.b64encode(error)
348             reply = "%d|%s" % (ERROR, result)
349         log_reply(self, reply)
350         return reply
351
352     def guids(self, params):
353         guids = self._testbed.guids
354         guids = ",".join(map(str, guids))
355         result = base64.b64encode(guids)
356         return "%d|%s" % (OK, result)
357
358     def defer_create(self, params):
359         guid = int(params[1])
360         factory_id = params[2]
361         self._testbed.defer_create(guid, factory_id)
362         return "%d|%s" % (OK, "")
363
364     def trace(self, params):
365         guid = int(params[1])
366         trace_id = params[2]
367         trace = self._testbed.trace(guid, trace_id)
368         result = base64.b64encode(trace)
369         return "%d|%s" % (OK, result)
370
371     def start(self, params):
372         self._testbed.start()
373         return "%d|%s" % (OK, "")
374
375     def stop(self, params):
376         self._testbed.stop()
377         return "%d|%s" % (OK, "")
378
379     def shutdown(self, params):
380         self._testbed.shutdown()
381         return "%d|%s" % (OK, "")
382
383     def defer_configure(self, params):
384         name = base64.b64decode(params[1])
385         value = base64.b64decode(params[2])
386         type = int(params[3])
387         value = set_type(type, value)
388         self._testbed.defer_configure(name, value)
389         return "%d|%s" % (OK, "")
390
391     def defer_create_set(self, params):
392         guid = int(params[1])
393         name = base64.b64decode(params[2])
394         value = base64.b64decode(params[3])
395         type = int(params[4])
396         value = set_type(type, value)
397         self._testbed.defer_create_set(guid, name, value)
398         return "%d|%s" % (OK, "")
399
400     def defer_factory_set(self, params):
401         name = base64.b64decode(params[1])
402         value = base64.b64decode(params[2])
403         type = int(params[3])
404         value = set_type(type, value)
405         self._testbed.defer_factory_set(name, value)
406         return "%d|%s" % (OK, "")
407
408     def defer_connect(self, params):
409         guid1 = int(params[1])
410         connector_type_name1 = params[2]
411         guid2 = int(params[3])
412         connector_type_name2 = params[4]
413         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
414             connector_type_name2)
415         return "%d|%s" % (OK, "")
416
417     def defer_cross_connect(self, params):
418         guid = int(params[1])
419         connector_type_name = params[2]
420         cross_guid = int(params[3])
421         connector_type_name = params[4]
422         cross_guid = int(params[5])
423         cross_testbed_id = params[6]
424         cross_factory_id = params[7]
425         cross_connector_type_name = params[8]
426         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
427             cross_testbed_id, cross_factory_id, cross_connector_type_name)
428         return "%d|%s" % (OK, "")
429
430     def defer_add_trace(self, params):
431         guid = int(params[1])
432         trace_id = params[2]
433         self._testbed.defer_add_trace(guid, trace_id)
434         return "%d|%s" % (OK, "")
435
436     def defer_add_address(self, params):
437         guid = int(params[1])
438         address = params[2]
439         netprefix = int(params[3])
440         broadcast = params[4]
441         self._testbed.defer_add_address(guid, address, netprefix,
442                 broadcast)
443         return "%d|%s" % (OK, "")
444
445     def defer_add_route(self, params):
446         guid = int(params[1])
447         destination = params[2]
448         netprefix = int(params[3])
449         nexthop = params[4]
450         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
451         return "%d|%s" % (OK, "")
452
453     def do_setup(self, params):
454         self._testbed.do_setup()
455         return "%d|%s" % (OK, "")
456
457     def do_create(self, params):
458         self._testbed.do_create()
459         return "%d|%s" % (OK, "")
460
461     def do_connect(self, params):
462         self._testbed.do_connect()
463         return "%d|%s" % (OK, "")
464
465     def do_configure(self, params):
466         self._testbed.do_configure()
467         return "%d|%s" % (OK, "")
468
469     def do_cross_connect(self, params):
470         self._testbed.do_cross_connect()
471         return "%d|%s" % (OK, "")
472
473     def get(self, params):
474         time = params[1]
475         guid = int(param[2] )
476         name = base64.b64decode(params[3])
477         value = self._testbed.get(time, guid, name)
478         result = base64.b64encode(str(value))
479         return "%d|%s" % (OK, result)
480
481     def set(self, params):
482         time = params[1]
483         guid = int(params[2])
484         name = base64.b64decode(params[3])
485         value = base64.b64decode(params[4])
486         type = int(params[3])
487         value = set_type(type, value)
488         self._testbed.set(time, guid, name, value)
489         return "%d|%s" % (OK, "")
490
491     def action(self, params):
492         time = params[1]
493         guid = int(params[2])
494         command = base64.b64decode(params[3])
495         self._testbed.action(time, guid, command)
496         return "%d|%s" % (OK, "")
497
498     def status(self, params):
499         guid = int(params[1])
500         status = self._testbed.status(guid)
501         result = base64.b64encode(str(status))
502         return "%d|%s" % (OK, result)
503  
504 class ExperimentControllerServer(server.Server):
505     def __init__(self, root_dir, log_level, experiment_xml):
506         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
507         self._experiment_xml = experiment_xml
508         self._controller = None
509
510     def post_daemonize(self):
511         from nepi.core.execute import ExperimentController
512         self._controller = ExperimentController(self._experiment_xml)
513
514     def reply_action(self, msg):
515         params = msg.split("|")
516         instruction = int(params[0])
517         log_msg(self, params)
518         try:
519             if instruction == XML:
520                 reply = self.experiment_xml(params)
521             elif instruction == ACCESS:
522                 reply = self.set_access_configuration(params)
523             elif instruction == TRACE:
524                 reply = self.trace(params)
525             elif instruction == FINISHED:
526                 reply = self.is_finished(params)
527             elif instruction == START:
528                 reply = self.start(params)
529             elif instruction == STOP:
530                 reply = self.stop(params)
531             elif instruction == SHUTDOWN:
532                 reply = self.shutdown(params)
533             else:
534                 error = "Invalid instruction %s" % instruction
535                 self.log_error(error)
536                 result = base64.b64encode(error)
537                 reply = "%d|%s" % (ERROR, result)
538         except:
539             error = self.log_error()
540             result = base64.b64encode(error)
541             reply = "%d|%s" % (ERROR, result)
542         log_reply(self, reply)
543         return reply
544
545     def experiment_xml(self, params):
546         xml = self._controller.experiment_xml
547         result = base64.b64encode(xml)
548         return "%d|%s" % (OK, result)
549
550     def set_access_configuration(self, params):
551         testbed_guid = int(params[1])
552         mode = params[2]
553         communication = params[3]
554         host = params[4]
555         user = params[5]
556         port = int(params[6])
557         root_dir = params[7]
558         use_agent = params[8] == "True"
559         log_level = params[9]
560         access_config = AccessConfiguration()
561         access_config.set_attribute_value("mode", mode)
562         access_config.set_attribute_value("communication", communication)
563         access_config.set_attribute_value("host", host)
564         access_config.set_attribute_value("user", user)
565         access_config.set_attribute_value("port", port)
566         access_config.set_attribute_value("rootDirectory", root_dir)
567         access_config.set_attribute_value("useAgent", use_agent)
568         access_config.set_attribute_value("logLevel", log_level)
569         self._controller.set_access_configuration(testbed_guid, 
570                 access_config)
571         return "%d|%s" % (OK, "")
572
573     def trace(self, params):
574         testbed_guid = int(params[1])
575         guid = int(params[2])
576         trace_id = params[3]
577         trace = self._controller.trace(testbed_guid, guid, trace_id)
578         result = base64.b64encode(trace)
579         return "%d|%s" % (OK, result)
580
581     def is_finished(self, params):
582         guid = int(params[1])
583         status = self._controller.is_finished(guid)
584         result = base64.b64encode(str(status))
585         return "%d|%s" % (OK, result)
586
587     def start(self, params):
588         self._controller.start()
589         return "%d|%s" % (OK, "")
590
591     def stop(self, params):
592         self._controller.stop()
593         return "%d|%s" % (OK, "")
594
595     def shutdown(self, params):
596         self._controller.shutdown()
597         return "%d|%s" % (OK, "")
598
599 class TestbedInstanceProxy(object):
600     def __init__(self, root_dir, log_level, testbed_id = None, 
601             testbed_version = None, launch = True, host = None, 
602             port = None, user = None, agent = None):
603         if launch:
604             if testbed_id == None or testbed_version == None:
605                 raise RuntimeError("To launch a TesbedInstance server a \
606                         testbed_id and testbed_version are required")
607             # ssh
608             if host != None:
609                 python_code = "from nepi.util.proxy import \
610                         TesbedInstanceServer;\
611                         s = TestbedInstanceServer('%s', %d, '%s', '%s');\
612                         s.run()" % (root_dir, log_level, testbed_id, 
613                                 testbed_version)
614                 self._client = launch_ssh_daemon_client(root_dir, python_code,
615                         host, port, user, agent)
616             else:
617                 # launch daemon
618                 s = TestbedInstanceServer(root_dir, log_level, testbed_id, 
619                     testbed_version)
620                 s.run()
621                 # create client
622                 self._client = server.Client(root_dir)
623
624     @property
625     def guids(self):
626         msg = testbed_messages[GUIDS]
627         self._client.send_msg(msg)
628         reply = self._client.read_reply()
629         result = reply.split("|")
630         code = int(result[0])
631         text = base64.b64decode(result[1])
632         if code == ERROR:
633             raise RuntimeError(text)
634         return map(int, text.split(","))
635
636     def defer_configure(self, name, value):
637         msg = testbed_messages[CONFIGURE]
638         type = get_type(value)
639         # avoid having "|" in this parameters
640         name = base64.b64encode(name)
641         value = base64.b64encode(str(value))
642         msg = msg % (name, value, type)
643         self._client.send_msg(msg)
644         reply = self._client.read_reply()
645         result = reply.split("|")
646         code = int(result[0])
647         text = base64.b64decode(result[1])
648         if code == ERROR:
649             raise RuntimeError(text)
650
651     def defer_create(self, guid, factory_id):
652         msg = testbed_messages[CREATE]
653         msg = msg % (guid, factory_id)
654         self._client.send_msg(msg)
655         reply = self._client.read_reply()
656         result = reply.split("|")
657         code = int(result[0])
658         text = base64.b64decode(result[1])
659         if code == ERROR:
660             raise RuntimeError(text)
661
662     def defer_create_set(self, guid, name, value):
663         msg = testbed_messages[CREATE_SET]
664         type = get_type(value)
665         # avoid having "|" in this parameters
666         name = base64.b64encode(name)
667         value = base64.b64encode(str(value))
668         msg = msg % (guid, name, value, type)
669         self._client.send_msg(msg)
670         reply = self._client.read_reply()
671         result = reply.split("|")
672         code = int(result[0])
673         text = base64.b64decode(result[1])
674         if code == ERROR:
675             raise RuntimeError(text)
676
677     def defer_factory_set(self, guid, name, value):
678         msg = testbed_messages[FACTORY_SET]
679         type = get_type(value)
680         # avoid having "|" in this parameters
681         name = base64.b64encode(name)
682         value = base64.b64encode(str(value))
683         msg = msg % (guid, name, value, type)
684         self._client.send_msg(msg)
685         reply = self._client.read_reply()
686         result = reply.split("|")
687         code = int(result[0])
688         text = base64.b64decode(result[1])
689         if code == ERROR:
690             raise RuntimeError(text)
691
692     def defer_connect(self, guid1, connector_type_name1, guid2, 
693             connector_type_name2): 
694         msg = testbed_messages[CONNECT]
695         msg = msg % (guid1, connector_type_name1, guid2, 
696             connector_type_name2)
697         self._client.send_msg(msg)
698         reply = self._client.read_reply()
699         result = reply.split("|")
700         code = int(result[0])
701         text = base64.b64decode(result[1])
702         if code == ERROR:
703             raise RuntimeError(text)
704
705     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
706             cross_testbed_id, cross_factory_id, cross_connector_type_name):
707         msg = testbed_messages[CROSS_CONNECT]
708         msg = msg % (guid, connector_type_name, cross_guid, 
709             cross_testbed_id, cross_factory_id, cross_connector_type_name)
710         self._client.send_msg(msg)
711         reply = self._client.read_reply()
712         result = reply.split("|")
713         code = int(result[0])
714         text = base64.b64decode(result[1])
715         if code == ERROR:
716             raise RuntimeError(text)
717
718     def defer_add_trace(self, guid, trace_id):
719         msg = testbed_messages[ADD_TRACE]
720         msg = msg % (guid, trace_id)
721         self._client.send_msg(msg)
722         reply = self._client.read_reply()
723         result = reply.split("|")
724         code = int(result[0])
725         text = base64.b64decode(result[1])
726         if code == ERROR:
727             raise RuntimeError(text)
728
729     def defer_add_address(self, guid, address, netprefix, broadcast): 
730         msg = testbed_messages[ADD_ADDRESS]
731         msg = msg % (guid, address, netprefix, broadcast)
732         self._client.send_msg(msg)
733         reply = self._client.read_reply()
734         result = reply.split("|")
735         code = int(result[0])
736         text = base64.b64decode(result[1])
737         if code == ERROR:
738             raise RuntimeError(text)
739
740     def defer_add_route(self, guid, destination, netprefix, nexthop):
741         msg = testbed_messages[ADD_ROUTE]
742         msg = msg % (guid, destination, netprefix, nexthop)
743         self._client.send_msg(msg)
744         reply = self._client.read_reply()
745         result = reply.split("|")
746         code = int(result[0])
747         text = base64.b64decode(result[1])
748         if code == ERROR:
749             raise RuntimeError(text)
750
751     def do_setup(self):
752         msg = testbed_messages[DO_SETUP]
753         self._client.send_msg(msg)
754         reply = self._client.read_reply()
755         result = reply.split("|")
756         code = int(result[0])
757         text = base64.b64decode(result[1])
758         if code == ERROR:
759             raise RuntimeError(text)
760
761     def do_create(self):
762         msg = testbed_messages[DO_CREATE]
763         self._client.send_msg(msg)
764         reply = self._client.read_reply()
765         result = reply.split("|")
766         code = int(result[0])
767         text = base64.b64decode(result[1])
768         if code == ERROR:
769             raise RuntimeError(text)
770
771     def do_connect(self):
772         msg = testbed_messages[DO_CONNECT]
773         self._client.send_msg(msg)
774         reply = self._client.read_reply()
775         result = reply.split("|")
776         code = int(result[0])
777         text = base64.b64decode(result[1])
778         if code == ERROR:
779             raise RuntimeError(text)
780
781     def do_configure(self):
782         msg = testbed_messages[DO_CONFIGURE]
783         self._client.send_msg(msg)
784         reply = self._client.read_reply()
785         result = reply.split("|")
786         code = int(result[0])
787         text = base64.b64decode(result[1])
788         if code == ERROR:
789             raise RuntimeError(text)
790
791     def do_cross_connect(self):
792         msg = testbed_messages[DO_CROSS_CONNECT]
793         self._client.send_msg(msg)
794         reply = self._client.read_reply()
795         result = reply.split("|")
796         code = int(result[0])
797         text = base64.b64decode(result[1])
798         if code == ERROR:
799             raise RuntimeError(text)
800
801     def start(self, time = TIME_NOW):
802         msg = testbed_messages[START]
803         self._client.send_msg(msg)
804         reply = self._client.read_reply()
805         result = reply.split("|")
806         code = int(result[0])
807         text = base64.b64decode(result[1])
808         if code == ERROR:
809             raise RuntimeError(text)
810
811     def stop(self, time = TIME_NOW):
812         msg = testbed_messages[STOP]
813         self._client.send_msg(msg)
814         reply = self._client.read_reply()
815         result = reply.split("|")
816         code = int(result[0])
817         text = base64.b64decode(result[1])
818         if code == ERROR:
819             raise RuntimeError(text)
820
821     def set(self, time, guid, name, value):
822         msg = testbed_messages[SET]
823         type = get_type(value)
824         # avoid having "|" in this parameters
825         name = base64.b64encode(name)
826         value = base64.b64encode(str(value))
827         msg = msg % (time, guid, name, value, type)
828         self._client.send_msg(msg)
829         reply = self._client.read_reply()
830         result = reply.split("|")
831         code = int(result[0])
832         text = base64.b64decode(result[1])
833         if code == ERROR:
834             raise RuntimeError(text)
835
836     def get(self, time, guid, name):
837         msg = testbed_messages[GET]
838         # avoid having "|" in this parameters
839         name = base64.b64encode(name)
840         msg = msg % (time, guid, name)
841         self._client.send_msg(msg)
842         reply = self._client.read_reply()
843         result = reply.split("|")
844         code = int(result[0])
845         text = base64.b64decode(result[1])
846         if code == ERROR:
847             raise RuntimeError(text)
848         return text
849
850     def action(self, time, guid, action):
851         msg = testbed_messages[ACTION]
852         msg = msg % (time, guid, action)
853         self._client.send_msg(msg)
854         reply = self._client.read_reply()
855         result = reply.split("|")
856         code = int(result[0])
857         text = base64.b64decode(result[1])
858         if code == ERROR:
859             raise RuntimeError(text)
860
861     def status(self, guid):
862         msg = testbed_messages[STATUS]
863         msg = msg % (guid)
864         self._client.send_msg(msg)
865         reply = self._client.read_reply()
866         result = reply.split("|")
867         code = int(result[0])
868         text = base64.b64decode(result[1])
869         if code == ERROR:
870             raise RuntimeError(text)
871         return int(text)
872
873     def trace(self, guid, trace_id):
874         msg = testbed_messages[TRACE]
875         msg = msg % (guid, trace_id)
876         self._client.send_msg(msg)
877         reply = self._client.read_reply()
878         result = reply.split("|")
879         code = int(result[0])
880         text = base64.b64decode(result[1])
881         if code == ERROR:
882             raise RuntimeError(text)
883         return text
884
885     def shutdown(self):
886         msg = testbed_messages[SHUTDOWN]
887         self._client.send_msg(msg)
888         reply = self._client.read_reply()
889         result = reply.split("|")
890         code = int(result[0])
891         text = base64.b64decode(result[1])
892         if code == ERROR:
893             raise RuntimeError(text)
894         self._client.send_stop()
895
896 class ExperimentControllerProxy(object):
897     def __init__(self, root_dir, log_level, experiment_xml = None, 
898             launch = True, host = None, port = None, user = None, 
899             agent = None):
900         if launch:
901             # launch server
902             if experiment_xml == None:
903                 raise RuntimeError("To launch a ExperimentControllerServer a \
904                         xml description of the experiment is required")
905             # ssh
906             if host != None:
907                 xml = experiment_xml
908                 xml = xml.replace("'", r"\'")
909                 xml = xml.replace("\"", r"\'")
910                 xml = xml.replace("\n", r"")
911                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
912                         s = ExperimentControllerServer('%s', %d, '%s');\
913                         s.run()" % (root_dir, log_level, xml)
914                 self._client = launch_ssh_daemon_client(root_dir, python_code,
915                         host, port, user, agent)
916             else:
917                 # launch daemon
918                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
919                 s.run()
920                 # create client
921                 self._client = server.Client(root_dir)
922
923     @property
924     def experiment_xml(self):
925         msg = controller_messages[XML]
926         self._client.send_msg(msg)
927         reply = self._client.read_reply()
928         result = reply.split("|")
929         code = int(result[0])
930         text = base64.b64decode(result[1])
931         if code == ERROR:
932             raise RuntimeError(text)
933         return text
934
935     def set_access_configuration(self, testbed_guid, access_config):
936         mode = access_config.get_attribute_value("mode")
937         communication = access_config.get_attribute_value("communication")
938         host = access_config.get_attribute_value("host")
939         user = access_config.get_attribute_value("user")
940         port = access_config.get_attribute_value("port")
941         root_dir = access_config.get_attribute_value("rootDirectory")
942         use_agent = access_config.get_attribute_value("useAgent")
943         log_level = access_config.get_attribute_value("logLevel")
944         msg = controller_messages[ACCESS]
945         msg = msg % (testbed_guid, mode, communication, host, user, port, 
946                 root_dir, use_agent, log_level)
947         self._client.send_msg(msg)
948         reply = self._client.read_reply()
949         result = reply.split("|")
950         code = int(result[0])
951         text =  base64.b64decode(result[1])
952         if code == ERROR:
953             raise RuntimeError(text)
954
955     def trace(self, testbed_guid, guid, trace_id):
956         msg = controller_messages[TRACE]
957         msg = msg % (testbed_guid, guid, trace_id)
958         self._client.send_msg(msg)
959         reply = self._client.read_reply()
960         result = reply.split("|")
961         code = int(result[0])
962         text =  base64.b64decode(result[1])
963         if code == OK:
964             return text
965         raise RuntimeError(text)
966
967     def start(self):
968         msg = controller_messages[START]
969         self._client.send_msg(msg)
970         reply = self._client.read_reply()
971         result = reply.split("|")
972         code = int(result[0])
973         text =  base64.b64decode(result[1])
974         if code == ERROR:
975             raise RuntimeError(text)
976
977     def stop(self):
978         msg = controller_messages[STOP]
979         self._client.send_msg(msg)
980         reply = self._client.read_reply()
981         result = reply.split("|")
982         code = int(result[0])
983         text =  base64.b64decode(result[1])
984         if code == ERROR:
985             raise RuntimeError(text)
986
987     def is_finished(self, guid):
988         msg = controller_messages[FINISHED]
989         msg = msg % guid
990         self._client.send_msg(msg)
991         reply = self._client.read_reply()
992         result = reply.split("|")
993         code = int(result[0])
994         text = base64.b64decode(result[1])
995         if code == ERROR:
996             raise RuntimeError(text)
997         return text == "True"
998
999     def shutdown(self):
1000         msg = controller_messages[SHUTDOWN]
1001         self._client.send_msg(msg)
1002         reply = self._client.read_reply()
1003         result = reply.split("|")
1004         code = int(result[0])
1005         text =  base64.b64decode(result[1])
1006         if code == ERROR:
1007             raise RuntimeError(text)
1008         self._client.send_stop()
1009