server daemon launched over ssh connection.
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import sys
10 import time
11
12 # PROTOCOL REPLIES
13 OK = 0
14 ERROR = 1
15
16 # PROTOCOL INSTRUCTION MESSAGES
17 XML = 2 
18 ACCESS  = 3
19 TRACE   = 4
20 FINISHED    = 5
21 START   = 6
22 STOP    = 7
23 SHUTDOWN    = 8
24 CONFIGURE   = 9
25 CREATE      = 10
26 CREATE_SET  = 11
27 FACTORY_SET = 12
28 CONNECT     = 13
29 CROSS_CONNECT   = 14
30 ADD_TRACE   = 15
31 ADD_ADDRESS = 16
32 ADD_ROUTE   = 17
33 DO_SETUP    = 18
34 DO_CREATE   = 19
35 DO_CONNECT  = 20
36 DO_CONFIGURE    = 21
37 DO_CROSS_CONNECT    = 22
38 GET = 23
39 SET = 24
40 ACTION  = 25
41 STATUS  = 26
42 GUIDS  = 27
43
44 # PARAMETER TYPE
45 STRING  =  100
46 INTEGER = 101
47 BOOL    = 102
48 FLOAT   = 103
49
50 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
51 controller_messages = dict({
52     XML:    "%d" % XML,
53     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
54     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s"),
55     FINISHED:   "%d|%s" % (FINISHED, "%d"),
56     START:  "%d" % START,
57     STOP:   "%d" % STOP,
58     SHUTDOWN:   "%d" % SHUTDOWN,
59     })
60
61 # TESTBED INSTANCE PROTOCOL MESSAGES
62 testbed_messages = dict({
63     TRACE:  "%d|%s" % (TRACE, "%d|%s"),
64     START:  "%d|%s" % (START, "%s"),
65     STOP:    "%d|%s" % (STOP, "%s"),
66     SHUTDOWN:   "%d" % SHUTDOWN,
67     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
68     CREATE: "%d|%s" % (CREATE, "%d|%s"),
69     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
70     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
71     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
72     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
73     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
74     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
75     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
76     DO_SETUP:   "%d" % DO_SETUP,
77     DO_CREATE:  "%d" % DO_CREATE,
78     DO_CONNECT: "%d" % DO_CONNECT,
79     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
80     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
81     GET:    "%d|%s" % (GET, "%s|%d|%s"),
82     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
83     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
84     STATUS: "%d|%s" % (STATUS, "%d"),
85     GUIDS:  "%d" % GUIDS,
86     })
87
88 instruction_text = dict({
89     OK:     "OK",
90     ERROR:  "ERROR",
91     XML:    "XML",
92     ACCESS: "ACCESS",
93     TRACE:  "TRACE",
94     FINISHED:   "FINISHED",
95     START:  "START",
96     STOP:   "STOP",
97     SHUTDOWN:   "SHUTDOWN",
98     CONFIGURE:  "CONFIGURE",
99     CREATE: "CREATE",
100     CREATE_SET: "CREATE_SET",
101     FACTORY_SET:    "FACTORY_SET",
102     CONNECT:    "CONNECT",
103     CROSS_CONNECT: "CROSS_CONNECT",
104     ADD_TRACE:  "ADD_TRACE",
105     ADD_ADDRESS:    "ADD_ADDRESS",
106     ADD_ROUTE:  "ADD_ROUTE",
107     DO_SETUP:   "DO_SETUP",
108     DO_CREATE:  "DO_CREATE",
109     DO_CONNECT: "DO_CONNECT",
110     DO_CONFIGURE:   "DO_CONFIGURE",
111     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
112     GET:    "GET",
113     SET:    "SET",
114     ACTION: "ACTION",
115     STATUS: "STATUS",
116     GUIDS:  "GUIDS",
117     STRING: "STRING",
118     INTEGER:    "INTEGER",
119     BOOL:   "BOOL",
120     FLOAT:  "FLOAT"
121     })
122
123 def get_type(value):
124     if isinstance(value, bool):
125         return BOOL
126     elif isinstance(value, int):
127         return INTEGER
128     elif isinstance(value, float):
129         return FLOAT
130     else:
131         return STRING
132
133 def set_type(type, value):
134     if type == INTEGER:
135         value = int(value)
136     elif type == FLOAT:
137         value = float(value)
138     elif type == BOOL:
139         value = value == "True"
140     else:
141         value = str(value)
142     return value
143
144 def log_msg(server, params):
145     instr = int(params[0])
146     instr_txt = instruction_text[instr]
147     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
148         instr_txt, ", ".join(map(str, params[1:]))))
149
150 def log_reply(server, reply):
151     res = reply.split("|")
152     code = int(res[0])
153     code_txt = instruction_text[code]
154     txt = base64.b64decode(res[1])
155     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
156             code_txt, txt))
157
158 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
159     # launch daemon
160     proc = server.popen_ssh_subprocess(python_code, host = host,
161         port = port, user = user, agent = agent)
162     #while not proc.poll():
163     #    time.sleep(0.5)
164     if proc.poll():
165         err = proc.stderr.read()
166         raise RuntimeError("Client could not be executed: %s" % \
167                 err)
168     # create client
169     return server.Client(root_dir, host = host, port = port, user = user, 
170             agent = agent)
171
172 def to_server_log_level(log_level):
173     return server.DEBUG_LEVEL \
174             if log_level == AccessConfiguration.DEBUG_LEVEL \
175                 else server.ERROR_LEVEL
176
177 def get_access_config_params(access_config):
178     root_dir = access_config.get_attribute_value("rootDirectory")
179     log_level = access_config.get_attribute_value("logLevel")
180     log_level = to_server_log_level(log_level)
181     user = host = port = agent = None
182     communication = access_config.get_attribute_value("communication")
183     if communication == AccessConfiguration.ACCESS_SSH:
184         user = access_config.get_attribute_value("user")
185         host = access_config.get_attribute_value("host")
186         port = access_config.get_attribute_value("port")
187         agent = access_config.get_attribute_value("useAgent")
188     return (root_dir, log_level, user, host, port, agent)
189
190 class AccessConfiguration(AttributesMap):
191     MODE_SINGLE_PROCESS = "SINGLE"
192     MODE_DAEMON = "DAEMON"
193     ACCESS_SSH = "SSH"
194     ACCESS_LOCAL = "LOCAL"
195     ERROR_LEVEL = "Error"
196     DEBUG_LEVEL = "Debug"
197
198     def __init__(self):
199         super(AccessConfiguration, self).__init__()
200         self.add_attribute(name = "mode",
201                 help = "Instance execution mode",
202                 type = Attribute.ENUM,
203                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
204                 allowed = [AccessConfiguration.MODE_DAEMON,
205                     AccessConfiguration.MODE_SINGLE_PROCESS],
206                 validation_function = validation.is_enum)
207         self.add_attribute(name = "communication",
208                 help = "Instance communication mode",
209                 type = Attribute.ENUM,
210                 value = AccessConfiguration.ACCESS_LOCAL,
211                 allowed = [AccessConfiguration.ACCESS_LOCAL,
212                     AccessConfiguration.ACCESS_SSH],
213                 validation_function = validation.is_enum)
214         self.add_attribute(name = "host",
215                 help = "Host where the testbed will be executed",
216                 type = Attribute.STRING,
217                 value = "localhost",
218                 validation_function = validation.is_string)
219         self.add_attribute(name = "user",
220                 help = "User on the Host to execute the testbed",
221                 type = Attribute.STRING,
222                 value = getpass.getuser(),
223                 validation_function = validation.is_string)
224         self.add_attribute(name = "port",
225                 help = "Port on the Host",
226                 type = Attribute.INTEGER,
227                 value = 22,
228                 validation_function = validation.is_integer)
229         self.add_attribute(name = "rootDirectory",
230                 help = "Root directory for storing process files",
231                 type = Attribute.STRING,
232                 value = ".",
233                 validation_function = validation.is_string) # TODO: validation.is_path
234         self.add_attribute(name = "useAgent",
235                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
236                 type = Attribute.BOOL,
237                 value = False,
238                 validation_function = validation.is_bool)
239         self.add_attribute(name = "logLevel",
240                 help = "Log level for instance",
241                 type = Attribute.ENUM,
242                 value = AccessConfiguration.ERROR_LEVEL,
243                 allowed = [AccessConfiguration.ERROR_LEVEL,
244                     AccessConfiguration.DEBUG_LEVEL],
245                 validation_function = validation.is_enum)
246
247 def create_controller(xml, access_config = None):
248     mode = None if not access_config else \
249             access_config.get_attribute_value("mode")
250     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
251         from nepi.core.execute import ExperimentController
252         return ExperimentController(xml)
253     elif mode == AccessConfiguration.MODE_DAEMON:
254         (root_dir, log_level, user, host, port, agent) = \
255                 get_access_config_params(access_config)
256         return ExperimentControllerProxy(root_dir, log_level,
257                 experiment_xml = xml, host = host, port = port, user = user, 
258                 agent = agent)
259     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
260
261 def create_testbed_instance(testbed_id, testbed_version, access_config):
262     mode = None if not access_config else access_config.get_attribute_value("mode")
263     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
264         return  _build_testbed_instance(testbed_id, testbed_version)
265     elif mode == AccessConfiguration.MODE_DAEMON:
266         (root_dir, log_level, user, host, port, agent) = \
267                 get_access_config_params(access_config)
268         return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, 
269                 testbed_version = testbed_version, host = host, port = port,
270                 user = user, agent = agent)
271     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
272
273 def _build_testbed_instance(testbed_id, testbed_version):
274     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
275     if not mod_name in sys.modules:
276         __import__(mod_name)
277     module = sys.modules[mod_name]
278     return module.TestbedInstance(testbed_version)
279
280 class TestbedInstanceServer(server.Server):
281     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
282         super(TestbedInstanceServer, self).__init__(root_dir, log_level)
283         self._testbed_id = testbed_id
284         self._testbed_version = testbed_version
285         self._testbed = None
286
287     def post_daemonize(self):
288         self._testbed = _build_testbed_instance(self._testbed_id, 
289                 self._testbed_version)
290
291     def reply_action(self, msg):
292         params = msg.split("|")
293         instruction = int(params[0])
294         log_msg(self, params)
295         try:
296             if instruction == TRACE:
297                 reply = self.trace(params)
298             elif instruction == START:
299                 reply = self.start(params)
300             elif instruction == STOP:
301                 reply = self.stop(params)
302             elif instruction == SHUTDOWN:
303                 reply = self.shutdown(params)
304             elif instruction == CONFIGURE:
305                 reply = self.configure(params)
306             elif instruction == CREATE:
307                 reply = self.create(params)
308             elif instruction == CREATE_SET:
309                 reply = self.create_set(params)
310             elif instruction == FACTORY_SET:
311                 reply = self.factory_set(params)
312             elif instruction == CONNECT:
313                 reply = self.connect(params)
314             elif instruction == CROSS_CONNECT:
315                 reply = self.cross_connect(params)
316             elif instruction == ADD_TRACE:
317                 reply = self.add_trace(params)
318             elif instruction == ADD_ADDRESS:
319                 reply = self.add_address(params)
320             elif instruction == ADD_ROUTE:
321                 reply = self.add_route(params)
322             elif instruction == DO_SETUP:
323                 reply = self.do_setup(params)
324             elif instruction == DO_CREATE:
325                 reply = self.do_create(params)
326             elif instruction == DO_CONNECT:
327                 reply = self.do_connect(params)
328             elif instruction == DO_CONFIGURE:
329                 reply = self.do_configure(params)
330             elif instruction == DO_CROSS_CONNECT:
331                 reply = self.do_cross_connect(params)
332             elif instruction == GET:
333                 reply = self.get(params)
334             elif instruction == SET:
335                 reply = self.set(params)
336             elif instruction == ACTION:
337                 reply = self.action(params)
338             elif instruction == STATUS:
339                 reply = self.status(params)
340             elif instruction == GUIDS:
341                 reply = self.guids(params)
342             else:
343                 error = "Invalid instruction %s" % instruction
344                 self.log_error(error)
345                 result = base64.b64encode(error)
346                 reply = "%d|%s" % (ERROR, result)
347         except:
348             error = self.log_error()
349             result = base64.b64encode(error)
350             reply = "%d|%s" % (ERROR, result)
351         log_reply(self, reply)
352         return reply
353
354     def guids(self, params):
355         guids = self._testbed.guids
356         guids = ",".join(map(str, guids))
357         result = base64.b64encode(guids)
358         return "%d|%s" % (OK, result)
359
360     def create(self, params):
361         guid = int(params[1])
362         factory_id = params[2]
363         self._testbed.create(guid, factory_id)
364         return "%d|%s" % (OK, "")
365
366     def trace(self, params):
367         guid = int(params[1])
368         trace_id = params[2]
369         trace = self._testbed.trace(guid, trace_id)
370         result = base64.b64encode(trace)
371         return "%d|%s" % (OK, result)
372
373     def start(self, params):
374         time = params[1]
375         self._testbed.start(time)
376         return "%d|%s" % (OK, "")
377
378     def stop(self, params):
379         time = params[1]
380         self._testbed.stop(time)
381         return "%d|%s" % (OK, "")
382
383     def shutdown(self, params):
384         self._testbed.shutdown()
385         return "%d|%s" % (OK, "")
386
387     def configure(self, params):
388         name = base64.b64decode(params[1])
389         value = base64.b64decode(params[2])
390         type = int(params[3])
391         value = set_type(type, value)
392         self._testbed.configure(name, value)
393         return "%d|%s" % (OK, "")
394
395     def create_set(self, params):
396         guid = int(params[1])
397         name = base64.b64decode(params[2])
398         value = base64.b64decode(params[3])
399         type = int(params[4])
400         value = set_type(type, value)
401         self._testbed.create_set(guid, name, value)
402         return "%d|%s" % (OK, "")
403
404     def factory_set(self, params):
405         name = base64.b64decode(params[1])
406         value = base64.b64decode(params[2])
407         type = int(params[3])
408         value = set_type(type, value)
409         self._testbed.factory_set(name, value)
410         return "%d|%s" % (OK, "")
411
412     def connect(self, params):
413         guid1 = int(params[1])
414         connector_type_name1 = params[2]
415         guid2 = int(params[3])
416         connector_type_name2 = params[4]
417         self._testbed.connect(guid1, connector_type_name1, guid2, 
418             connector_type_name2)
419         return "%d|%s" % (OK, "")
420
421     def cross_connect(self, params):
422         guid = int(params[1])
423         connector_type_name = params[2]
424         cross_guid = int(params[3])
425         connector_type_name = params[4]
426         cross_guid = int(params[5])
427         cross_testbed_id = params[6]
428         cross_factory_id = params[7]
429         cross_connector_type_name = params[8]
430         self._testbed.cross_connect(guid, connector_type_name, cross_guid, 
431             cross_testbed_id, cross_factory_id, cross_connector_type_name)
432         return "%d|%s" % (OK, "")
433
434     def add_trace(self, params):
435         guid = int(params[1])
436         trace_id = params[2]
437         self._testbed.add_trace(guid, trace_id)
438         return "%d|%s" % (OK, "")
439
440     def add_address(self, params):
441         guid = int(params[1])
442         family = int(params[2])
443         address = params[3]
444         netprefix = int(params[4])
445         broadcast = params[5]
446         self._testbed.add_address(guid, family, address, netprefix,
447                 broadcast)
448         return "%d|%s" % (OK, "")
449
450     def add_route(self, params):
451         guid = int(params[1])
452         destination = params[2]
453         netprefix = int(params[3])
454         nexthop = params[4]
455         self._testbed.add_route(guid, destination, netprefix, nexthop)
456         return "%d|%s" % (OK, "")
457
458     def do_setup(self, params):
459         self._testbed.do_setup()
460         return "%d|%s" % (OK, "")
461
462     def do_create(self, params):
463         self._testbed.do_create()
464         return "%d|%s" % (OK, "")
465
466     def do_connect(self, params):
467         self._testbed.do_connect()
468         return "%d|%s" % (OK, "")
469
470     def do_configure(self, params):
471         self._testbed.do_configure()
472         return "%d|%s" % (OK, "")
473
474     def do_cross_connect(self, params):
475         self._testbed.do_cross_connect()
476         return "%d|%s" % (OK, "")
477
478     def get(self, params):
479         time = params[1]
480         guid = int(param[2] )
481         name = base64.b64decode(params[3])
482         value = self._testbed.get(time, guid, name)
483         result = base64.b64encode(str(value))
484         return "%d|%s" % (OK, result)
485
486     def set(self, params):
487         time = params[1]
488         guid = int(params[2])
489         name = base64.b64decode(params[3])
490         value = base64.b64decode(params[4])
491         type = int(params[3])
492         value = set_type(type, value)
493         self._testbed.set(time, guid, name, value)
494         return "%d|%s" % (OK, "")
495
496     def action(self, params):
497         time = params[1]
498         guid = int(params[2])
499         command = base64.b64decode(params[3])
500         self._testbed.action(time, guid, command)
501         return "%d|%s" % (OK, "")
502
503     def status(self, params):
504         guid = int(params[1])
505         status = self._testbed.status(guid)
506         result = base64.b64encode(str(status))
507         return "%d|%s" % (OK, result)
508  
509 class ExperimentControllerServer(server.Server):
510     def __init__(self, root_dir, log_level, experiment_xml):
511         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
512         self._experiment_xml = experiment_xml
513         self._controller = None
514
515     def post_daemonize(self):
516         from nepi.core.execute import ExperimentController
517         self._controller = ExperimentController(self._experiment_xml)
518
519     def reply_action(self, msg):
520         params = msg.split("|")
521         instruction = int(params[0])
522         log_msg(self, params)
523         try:
524             if instruction == XML:
525                 reply = self.experiment_xml(params)
526             elif instruction == ACCESS:
527                 reply = self.set_access_configuration(params)
528             elif instruction == TRACE:
529                 reply = self.trace(params)
530             elif instruction == FINISHED:
531                 reply = self.is_finished(params)
532             elif instruction == START:
533                 reply = self.start(params)
534             elif instruction == STOP:
535                 reply = self.stop(params)
536             elif instruction == SHUTDOWN:
537                 reply = self.shutdown(params)
538             else:
539                 error = "Invalid instruction %s" % instruction
540                 self.log_error(error)
541                 result = base64.b64encode(error)
542                 reply = "%d|%s" % (ERROR, result)
543         except:
544             error = self.log_error()
545             result = base64.b64encode(error)
546             reply = "%d|%s" % (ERROR, result)
547         log_reply(self, reply)
548         return reply
549
550     def experiment_xml(self, params):
551         xml = self._controller.experiment_xml
552         result = base64.b64encode(xml)
553         return "%d|%s" % (OK, result)
554
555     def set_access_configuration(self, params):
556         testbed_guid = int(params[1])
557         mode = params[2]
558         communication = params[3]
559         host = params[4]
560         user = params[5]
561         port = int(params[6])
562         root_dir = params[7]
563         use_agent = params[8] == "True"
564         log_level = params[9]
565         access_config = AccessConfiguration()
566         access_config.set_attribute_value("mode", mode)
567         access_config.set_attribute_value("communication", communication)
568         access_config.set_attribute_value("host", host)
569         access_config.set_attribute_value("user", user)
570         access_config.set_attribute_value("port", port)
571         access_config.set_attribute_value("rootDirectory", root_dir)
572         access_config.set_attribute_value("useAgent", use_agent)
573         access_config.set_attribute_value("logLevel", log_level)
574         self._controller.set_access_configuration(testbed_guid, 
575                 access_config)
576         return "%d|%s" % (OK, "")
577
578     def trace(self, params):
579         testbed_guid = int(params[1])
580         guid = int(params[2])
581         trace_id = params[3]
582         trace = self._controller.trace(testbed_guid, guid, trace_id)
583         result = base64.b64encode(trace)
584         return "%d|%s" % (OK, result)
585
586     def is_finished(self, params):
587         guid = int(params[1])
588         status = self._controller.is_finished(guid)
589         result = base64.b64encode(str(status))
590         return "%d|%s" % (OK, result)
591
592     def start(self, params):
593         self._controller.start()
594         return "%d|%s" % (OK, "")
595
596     def stop(self, params):
597         self._controller.stop()
598         return "%d|%s" % (OK, "")
599
600     def shutdown(self, params):
601         self._controller.shutdown()
602         return "%d|%s" % (OK, "")
603
604 class TestbedIntanceProxy(object):
605     def __init__(self, root_dir, log_level, testbed_id = None, 
606             testbed_version = None, launch = True, host = None, 
607             port = None, user = None, agent = None):
608         if launch:
609             if testbed_id == None or testbed_version == None:
610                 raise RuntimeError("To launch a TesbedInstance server a \
611                         testbed_id and testbed_version are required")
612             # ssh
613             if host != None:
614                 python_code = "from nepi.util.proxy import \
615                         TesbedInstanceServer;\
616                         s = TestbedInstanceServer('%s', %d, '%s', '%s');\
617                         s.run()" % (root_dir, log_level, testbed_id, 
618                                 testbed_version)
619                 self._client = launch_ssh_daemon_client(root_dir, python_code,
620                         host, port, user, agent)
621             else:
622                 # launch daemon
623                 s = TestbedInstanceServer(root_dir, log_level, testbed_id, 
624                     testbed_version)
625                 s.run()
626                 # create client
627                 self._client = server.Client(root_dir)
628
629     @property
630     def guids(self):
631         msg = testbed_messages[GUIDS]
632         self._client.send_msg(msg)
633         reply = self._client.read_reply()
634         result = reply.split("|")
635         code = int(result[0])
636         text = base64.b64decode(result[1])
637         if code == ERROR:
638             raise RuntimeError(text)
639         return map(int, text.split(","))
640
641     def configure(self, name, value):
642         msg = testbed_messages[CONFIGURE]
643         type = get_type(value)
644         # avoid having "|" in this parameters
645         name = base64.b64encode(name)
646         value = base64.b64encode(str(value))
647         msg = msg % (name, value, type)
648         self._client.send_msg(msg)
649         reply = self._client.read_reply()
650         result = reply.split("|")
651         code = int(result[0])
652         text = base64.b64decode(result[1])
653         if code == ERROR:
654             raise RuntimeError(text)
655
656     def create(self, guid, factory_id):
657         msg = testbed_messages[CREATE]
658         msg = msg % (guid, factory_id)
659         self._client.send_msg(msg)
660         reply = self._client.read_reply()
661         result = reply.split("|")
662         code = int(result[0])
663         text = base64.b64decode(result[1])
664         if code == ERROR:
665             raise RuntimeError(text)
666
667     def create_set(self, guid, name, value):
668         msg = testbed_messages[CREATE_SET]
669         type = get_type(value)
670         # avoid having "|" in this parameters
671         name = base64.b64encode(name)
672         value = base64.b64encode(str(value))
673         msg = msg % (guid, name, value, type)
674         self._client.send_msg(msg)
675         reply = self._client.read_reply()
676         result = reply.split("|")
677         code = int(result[0])
678         text = base64.b64decode(result[1])
679         if code == ERROR:
680             raise RuntimeError(text)
681
682     def factory_set(self, guid, name, value):
683         msg = testbed_messages[FACTORY_SET]
684         type = get_type(value)
685         # avoid having "|" in this parameters
686         name = base64.b64encode(name)
687         value = base64.b64encode(str(value))
688         msg = msg % (guid, name, value, type)
689         self._client.send_msg(msg)
690         reply = self._client.read_reply()
691         result = reply.split("|")
692         code = int(result[0])
693         text = base64.b64decode(result[1])
694         if code == ERROR:
695             raise RuntimeError(text)
696
697     def connect(self, guid1, connector_type_name1, guid2, 
698             connector_type_name2): 
699         msg = testbed_messages[CONNECT]
700         msg = msg % (guid1, connector_type_name1, guid2, 
701             connector_type_name2)
702         self._client.send_msg(msg)
703         reply = self._client.read_reply()
704         result = reply.split("|")
705         code = int(result[0])
706         text = base64.b64decode(result[1])
707         if code == ERROR:
708             raise RuntimeError(text)
709
710     def cross_connect(self, guid, connector_type_name, cross_guid, 
711             cross_testbed_id, cross_factory_id, cross_connector_type_name):
712         msg = testbed_messages[CROSS_CONNECT]
713         msg = msg % (guid, connector_type_name, cross_guid, 
714             cross_testbed_id, cross_factory_id, cross_connector_type_name)
715         self._client.send_msg(msg)
716         reply = self._client.read_reply()
717         result = reply.split("|")
718         code = int(result[0])
719         text = base64.b64decode(result[1])
720         if code == ERROR:
721             raise RuntimeError(text)
722
723     def add_trace(self, guid, trace_id):
724         msg = testbed_messages[ADD_TRACE]
725         msg = msg % (guid, trace_id)
726         self._client.send_msg(msg)
727         reply = self._client.read_reply()
728         result = reply.split("|")
729         code = int(result[0])
730         text = base64.b64decode(result[1])
731         if code == ERROR:
732             raise RuntimeError(text)
733
734     def add_address(self, guid, family, address, netprefix, broadcast): 
735         msg = testbed_messages[ADD_ADDRESS]
736         msg = msg % (guid, family, address, netprefix, broadcast)
737         self._client.send_msg(msg)
738         reply = self._client.read_reply()
739         result = reply.split("|")
740         code = int(result[0])
741         text = base64.b64decode(result[1])
742         if code == ERROR:
743             raise RuntimeError(text)
744
745     def add_route(self, guid, destination, netprefix, nexthop):
746         msg = testbed_messages[ADD_ROUTE]
747         msg = msg % (guid, destination, netprefix, nexthop)
748         self._client.send_msg(msg)
749         reply = self._client.read_reply()
750         result = reply.split("|")
751         code = int(result[0])
752         text = base64.b64decode(result[1])
753         if code == ERROR:
754             raise RuntimeError(text)
755
756     def do_setup(self):
757         msg = testbed_messages[DO_SETUP]
758         self._client.send_msg(msg)
759         reply = self._client.read_reply()
760         result = reply.split("|")
761         code = int(result[0])
762         text = base64.b64decode(result[1])
763         if code == ERROR:
764             raise RuntimeError(text)
765
766     def do_create(self):
767         msg = testbed_messages[DO_CREATE]
768         self._client.send_msg(msg)
769         reply = self._client.read_reply()
770         result = reply.split("|")
771         code = int(result[0])
772         text = base64.b64decode(result[1])
773         if code == ERROR:
774             raise RuntimeError(text)
775
776     def do_connect(self):
777         msg = testbed_messages[DO_CONNECT]
778         self._client.send_msg(msg)
779         reply = self._client.read_reply()
780         result = reply.split("|")
781         code = int(result[0])
782         text = base64.b64decode(result[1])
783         if code == ERROR:
784             raise RuntimeError(text)
785
786     def do_configure(self):
787         msg = testbed_messages[DO_CONFIGURE]
788         self._client.send_msg(msg)
789         reply = self._client.read_reply()
790         result = reply.split("|")
791         code = int(result[0])
792         text = base64.b64decode(result[1])
793         if code == ERROR:
794             raise RuntimeError(text)
795
796     def do_cross_connect(self):
797         msg = testbed_messages[DO_CROSS_CONNECT]
798         self._client.send_msg(msg)
799         reply = self._client.read_reply()
800         result = reply.split("|")
801         code = int(result[0])
802         text = base64.b64decode(result[1])
803         if code == ERROR:
804             raise RuntimeError(text)
805
806     def start(self, time = TIME_NOW):
807         msg = testbed_messages[START]
808         msg = msg % (time)
809         self._client.send_msg(msg)
810         reply = self._client.read_reply()
811         result = reply.split("|")
812         code = int(result[0])
813         text = base64.b64decode(result[1])
814         if code == ERROR:
815             raise RuntimeError(text)
816
817     def stop(self, time = TIME_NOW):
818         msg = testbed_messages[STOP]
819         msg = msg % (time)
820         self._client.send_msg(msg)
821         reply = self._client.read_reply()
822         result = reply.split("|")
823         code = int(result[0])
824         text = base64.b64decode(result[1])
825         if code == ERROR:
826             raise RuntimeError(text)
827
828     def set(self, time, guid, name, value):
829         msg = testbed_messages[SET]
830         type = get_type(value)
831         # avoid having "|" in this parameters
832         name = base64.b64encode(name)
833         value = base64.b64encode(str(value))
834         msg = msg % (time, guid, name, value, type)
835         self._client.send_msg(msg)
836         reply = self._client.read_reply()
837         result = reply.split("|")
838         code = int(result[0])
839         text = base64.b64decode(result[1])
840         if code == ERROR:
841             raise RuntimeError(text)
842
843     def get(self, time, guid, name):
844         msg = testbed_messages[GET]
845         # avoid having "|" in this parameters
846         name = base64.b64encode(name)
847         msg = msg % (time, guid, name)
848         self._client.send_msg(msg)
849         reply = self._client.read_reply()
850         result = reply.split("|")
851         code = int(result[0])
852         text = base64.b64decode(result[1])
853         if code == ERROR:
854             raise RuntimeError(text)
855         return text
856
857     def action(self, time, guid, action):
858         msg = testbed_messages[ACTION]
859         msg = msg % (time, guid, action)
860         self._client.send_msg(msg)
861         reply = self._client.read_reply()
862         result = reply.split("|")
863         code = int(result[0])
864         text = base64.b64decode(result[1])
865         if code == ERROR:
866             raise RuntimeError(text)
867
868     def status(self, guid):
869         msg = testbed_messages[STATUS]
870         msg = msg % (guid)
871         self._client.send_msg(msg)
872         reply = self._client.read_reply()
873         result = reply.split("|")
874         code = int(result[0])
875         text = base64.b64decode(result[1])
876         if code == ERROR:
877             raise RuntimeError(text)
878         return int(text)
879
880     def trace(self, guid, trace_id):
881         msg = testbed_messages[TRACE]
882         msg = msg % (guid, trace_id)
883         self._client.send_msg(msg)
884         reply = self._client.read_reply()
885         result = reply.split("|")
886         code = int(result[0])
887         text = base64.b64decode(result[1])
888         if code == ERROR:
889             raise RuntimeError(text)
890         return text
891
892     def shutdown(self):
893         msg = testbed_messages[SHUTDOWN]
894         self._client.send_msg(msg)
895         reply = self._client.read_reply()
896         result = reply.split("|")
897         code = int(result[0])
898         text = base64.b64decode(result[1])
899         if code == ERROR:
900             raise RuntimeError(text)
901         self._client.send_stop()
902
903 class ExperimentControllerProxy(object):
904     def __init__(self, root_dir, log_level, experiment_xml = None, 
905             launch = True, host = None, port = None, user = None, 
906             agent = None):
907         if launch:
908             # launch server
909             if experiment_xml == None:
910                 raise RuntimeError("To launch a ExperimentControllerServer a \
911                         xml description of the experiment is required")
912             # ssh
913             if host != None:
914                 xml = experiment_xml
915                 xml = xml.replace("'", r"\'")
916                 xml = xml.replace("\"", r"\'")
917                 xml = xml.replace("\n", r"")
918                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
919                         s = ExperimentControllerServer('%s', %d, '%s');\
920                         s.run()" % (root_dir, log_level, xml)
921                 self._client = launch_ssh_daemon_client(root_dir, python_code,
922                         host, port, user, agent)
923             else:
924                 # launch daemon
925                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
926                 s.run()
927                 # create client
928                 self._client = server.Client(root_dir)
929
930     @property
931     def experiment_xml(self):
932         msg = controller_messages[XML]
933         self._client.send_msg(msg)
934         reply = self._client.read_reply()
935         result = reply.split("|")
936         code = int(result[0])
937         text = base64.b64decode(result[1])
938         if code == ERROR:
939             raise RuntimeError(text)
940         return text
941
942     def set_access_configuration(self, testbed_guid, access_config):
943         mode = access_config.get_attribute_value("mode")
944         communication = access_config.get_attribute_value("communication")
945         host = access_config.get_attribute_value("host")
946         user = access_config.get_attribute_value("user")
947         port = access_config.get_attribute_value("port")
948         root_dir = access_config.get_attribute_value("rootDirectory")
949         use_agent = access_config.get_attribute_value("useAgent")
950         log_level = access_config.get_attribute_value("logLevel")
951         msg = controller_messages[ACCESS]
952         msg = msg % (testbed_guid, mode, communication, host, user, port, 
953                 root_dir, use_agent, log_level)
954         self._client.send_msg(msg)
955         reply = self._client.read_reply()
956         result = reply.split("|")
957         code = int(result[0])
958         text =  base64.b64decode(result[1])
959         if code == ERROR:
960             raise RuntimeError(text)
961
962     def trace(self, testbed_guid, guid, trace_id):
963         msg = controller_messages[TRACE]
964         msg = msg % (testbed_guid, guid, trace_id)
965         self._client.send_msg(msg)
966         reply = self._client.read_reply()
967         result = reply.split("|")
968         code = int(result[0])
969         text =  base64.b64decode(result[1])
970         if code == OK:
971             return text
972         raise RuntimeError(text)
973
974     def start(self):
975         msg = controller_messages[START]
976         self._client.send_msg(msg)
977         reply = self._client.read_reply()
978         result = reply.split("|")
979         code = int(result[0])
980         text =  base64.b64decode(result[1])
981         if code == ERROR:
982             raise RuntimeError(text)
983
984     def stop(self):
985         msg = controller_messages[STOP]
986         self._client.send_msg(msg)
987         reply = self._client.read_reply()
988         result = reply.split("|")
989         code = int(result[0])
990         text =  base64.b64decode(result[1])
991         if code == ERROR:
992             raise RuntimeError(text)
993
994     def is_finished(self, guid):
995         msg = controller_messages[FINISHED]
996         msg = msg % guid
997         self._client.send_msg(msg)
998         reply = self._client.read_reply()
999         result = reply.split("|")
1000         code = int(result[0])
1001         text = base64.b64decode(result[1])
1002         if code == ERROR:
1003             raise RuntimeError(text)
1004         return text == "True"
1005
1006     def shutdown(self):
1007         msg = controller_messages[SHUTDOWN]
1008         self._client.send_msg(msg)
1009         reply = self._client.read_reply()
1010         result = reply.split("|")
1011         code = int(result[0])
1012         text =  base64.b64decode(result[1])
1013         if code == ERROR:
1014             raise RuntimeError(text)
1015         self._client.send_stop()
1016