b075efa2ad192fc5734e2d20b9fae0863d2e18f5
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import sys
10 import time
11 import tempfile
12 import shutil
13
14 # PROTOCOL REPLIES
15 OK = 0
16 ERROR = 1
17
18 # PROTOCOL INSTRUCTION MESSAGES
19 XML = 2 
20 ACCESS  = 3
21 TRACE   = 4
22 FINISHED    = 5
23 START   = 6
24 STOP    = 7
25 SHUTDOWN    = 8
26 CONFIGURE   = 9
27 CREATE      = 10
28 CREATE_SET  = 11
29 FACTORY_SET = 12
30 CONNECT     = 13
31 CROSS_CONNECT   = 14
32 ADD_TRACE   = 15
33 ADD_ADDRESS = 16
34 ADD_ROUTE   = 17
35 DO_SETUP    = 18
36 DO_CREATE   = 19
37 DO_CONNECT  = 20
38 DO_CONFIGURE    = 21
39 DO_CROSS_CONNECT    = 22
40 GET = 23
41 SET = 24
42 ACTION  = 25
43 STATUS  = 26
44 GUIDS  = 27
45 GET_ROUTE = 28
46 GET_ADDRESS = 29
47 RECOVER = 30
48 DO_PRECONFIGURE    = 31
49
50 # PARAMETER TYPE
51 STRING  =  100
52 INTEGER = 101
53 BOOL    = 102
54 FLOAT   = 103
55
56 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
57 controller_messages = dict({
58     XML:    "%d" % XML,
59     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
60     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
61     FINISHED:   "%d|%s" % (FINISHED, "%d"),
62     START:  "%d" % START,
63     STOP:   "%d" % STOP,
64     RECOVER : "%d" % RECOVER,
65     SHUTDOWN:   "%d" % SHUTDOWN,
66     })
67
68 # TESTBED INSTANCE PROTOCOL MESSAGES
69 testbed_messages = dict({
70     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
71     START:  "%d" % START,
72     STOP:   "%d" % STOP,
73     SHUTDOWN:   "%d" % SHUTDOWN,
74     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
75     CREATE: "%d|%s" % (CREATE, "%d|%s"),
76     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
77     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
78     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
79     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
80     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
81     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
82     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
83     DO_SETUP:   "%d" % DO_SETUP,
84     DO_CREATE:  "%d" % DO_CREATE,
85     DO_CONNECT: "%d" % DO_CONNECT,
86     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
87     DO_PRECONFIGURE:   "%d" % DO_PRECONFIGURE,
88     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
89     GET:    "%d|%s" % (GET, "%s|%d|%s"),
90     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
91     GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
92     GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
93     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
94     STATUS: "%d|%s" % (STATUS, "%d"),
95     GUIDS:  "%d" % GUIDS,
96     })
97
98 instruction_text = dict({
99     OK:     "OK",
100     ERROR:  "ERROR",
101     XML:    "XML",
102     ACCESS: "ACCESS",
103     TRACE:  "TRACE",
104     FINISHED:   "FINISHED",
105     START:  "START",
106     STOP:   "STOP",
107     RECOVER: "RECOVER",
108     SHUTDOWN:   "SHUTDOWN",
109     CONFIGURE:  "CONFIGURE",
110     CREATE: "CREATE",
111     CREATE_SET: "CREATE_SET",
112     FACTORY_SET:    "FACTORY_SET",
113     CONNECT:    "CONNECT",
114     CROSS_CONNECT: "CROSS_CONNECT",
115     ADD_TRACE:  "ADD_TRACE",
116     ADD_ADDRESS:    "ADD_ADDRESS",
117     ADD_ROUTE:  "ADD_ROUTE",
118     DO_SETUP:   "DO_SETUP",
119     DO_CREATE:  "DO_CREATE",
120     DO_CONNECT: "DO_CONNECT",
121     DO_CONFIGURE:   "DO_CONFIGURE",
122     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
123     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
124     GET:    "GET",
125     SET:    "SET",
126     GET_ROUTE: "GET_ROUTE",
127     GET_ADDRESS: "GET_ADDRESS",
128     ACTION: "ACTION",
129     STATUS: "STATUS",
130     GUIDS:  "GUIDS",
131     STRING: "STRING",
132     INTEGER:    "INTEGER",
133     BOOL:   "BOOL",
134     FLOAT:  "FLOAT"
135     })
136
137 def get_type(value):
138     if isinstance(value, bool):
139         return BOOL
140     elif isinstance(value, int):
141         return INTEGER
142     elif isinstance(value, float):
143         return FLOAT
144     else:
145         return STRING
146
147 def set_type(type, value):
148     if type == INTEGER:
149         value = int(value)
150     elif type == FLOAT:
151         value = float(value)
152     elif type == BOOL:
153         value = value == "True"
154     else:
155         value = str(value)
156     return value
157
158 def log_msg(server, params):
159     instr = int(params[0])
160     instr_txt = instruction_text[instr]
161     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
162         instr_txt, ", ".join(map(str, params[1:]))))
163
164 def log_reply(server, reply):
165     res = reply.split("|")
166     code = int(res[0])
167     code_txt = instruction_text[code]
168     txt = base64.b64decode(res[1])
169     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
170             code_txt, txt))
171
172 def to_server_log_level(log_level):
173     return server.DEBUG_LEVEL \
174             if log_level == AccessConfiguration.DEBUG_LEVEL \
175                 else server.ERROR_LEVEL
176
177 def get_access_config_params(access_config):
178     root_dir = access_config.get_attribute_value("rootDirectory")
179     log_level = access_config.get_attribute_value("logLevel")
180     log_level = to_server_log_level(log_level)
181     user = host = port = agent = None
182     communication = access_config.get_attribute_value("communication")
183     if communication == AccessConfiguration.ACCESS_SSH:
184         user = access_config.get_attribute_value("user")
185         host = access_config.get_attribute_value("host")
186         port = access_config.get_attribute_value("port")
187         agent = access_config.get_attribute_value("useAgent")
188     return (root_dir, log_level, user, host, port, agent)
189
190 class AccessConfiguration(AttributesMap):
191     MODE_SINGLE_PROCESS = "SINGLE"
192     MODE_DAEMON = "DAEMON"
193     ACCESS_SSH = "SSH"
194     ACCESS_LOCAL = "LOCAL"
195     ERROR_LEVEL = "Error"
196     DEBUG_LEVEL = "Debug"
197
198     def __init__(self):
199         super(AccessConfiguration, self).__init__()
200         self.add_attribute(name = "mode",
201                 help = "Instance execution mode",
202                 type = Attribute.ENUM,
203                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
204                 allowed = [AccessConfiguration.MODE_DAEMON,
205                     AccessConfiguration.MODE_SINGLE_PROCESS],
206                 validation_function = validation.is_enum)
207         self.add_attribute(name = "communication",
208                 help = "Instance communication mode",
209                 type = Attribute.ENUM,
210                 value = AccessConfiguration.ACCESS_LOCAL,
211                 allowed = [AccessConfiguration.ACCESS_LOCAL,
212                     AccessConfiguration.ACCESS_SSH],
213                 validation_function = validation.is_enum)
214         self.add_attribute(name = "host",
215                 help = "Host where the testbed will be executed",
216                 type = Attribute.STRING,
217                 value = "localhost",
218                 validation_function = validation.is_string)
219         self.add_attribute(name = "user",
220                 help = "User on the Host to execute the testbed",
221                 type = Attribute.STRING,
222                 value = getpass.getuser(),
223                 validation_function = validation.is_string)
224         self.add_attribute(name = "port",
225                 help = "Port on the Host",
226                 type = Attribute.INTEGER,
227                 value = 22,
228                 validation_function = validation.is_integer)
229         self.add_attribute(name = "rootDirectory",
230                 help = "Root directory for storing process files",
231                 type = Attribute.STRING,
232                 value = ".",
233                 validation_function = validation.is_string) # TODO: validation.is_path
234         self.add_attribute(name = "useAgent",
235                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
236                 type = Attribute.BOOL,
237                 value = False,
238                 validation_function = validation.is_bool)
239         self.add_attribute(name = "logLevel",
240                 help = "Log level for instance",
241                 type = Attribute.ENUM,
242                 value = AccessConfiguration.ERROR_LEVEL,
243                 allowed = [AccessConfiguration.ERROR_LEVEL,
244                     AccessConfiguration.DEBUG_LEVEL],
245                 validation_function = validation.is_enum)
246         self.add_attribute(name = "recover",
247                 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
248                 type = Attribute.BOOL,
249                 value = False,
250                 validation_function = validation.is_bool)
251
252 class TempDir(object):
253     def __init__(self):
254         self.path = tempfile.mkdtemp()
255     
256     def __del__(self):
257         shutil.rmtree(self.path)
258
259 class PermDir(object):
260     def __init__(self, path):
261         self.path = path
262
263 def create_controller(xml, access_config = None):
264     mode = None if not access_config \
265             else access_config.get_attribute_value("mode")
266     launch = True if not access_config \
267             else not access_config.get_attribute_value("recover")
268     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
269         if not launch:
270             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
271         
272         from nepi.core.execute import ExperimentController
273         
274         if not access_config or not access_config.has_attribute("rootDirectory"):
275             root_dir = TempDir()
276         else:
277             root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
278         controller = ExperimentController(xml, root_dir.path)
279         
280         # inject reference to temporary dir, so that it gets cleaned
281         # up at destruction time.
282         controller._tempdir = root_dir
283         
284         return controller
285     elif mode == AccessConfiguration.MODE_DAEMON:
286         (root_dir, log_level, user, host, port, agent) = \
287                 get_access_config_params(access_config)
288         return ExperimentControllerProxy(root_dir, log_level,
289                 experiment_xml = xml, host = host, port = port, user = user, 
290                 agent = agent, launch = launch)
291     raise RuntimeError("Unsupported access configuration '%s'" % mode)
292
293 def create_testbed_instance(testbed_id, testbed_version, access_config):
294     mode = None if not access_config \
295             else access_config.get_attribute_value("mode")
296     launch = True if not access_config \
297             else not access_config.get_attribute_value("recover")
298     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
299         if not launch:
300             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
301         return  _build_testbed_instance(testbed_id, testbed_version)
302     elif mode == AccessConfiguration.MODE_DAEMON:
303         (root_dir, log_level, user, host, port, agent) = \
304                 get_access_config_params(access_config)
305         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
306                 testbed_version = testbed_version, host = host, port = port,
307                 user = user, agent = agent, launch = launch)
308     raise RuntimeError("Unsupported access configuration '%s'" % mode)
309
310 def _build_testbed_instance(testbed_id, testbed_version):
311     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
312     if not mod_name in sys.modules:
313         __import__(mod_name)
314     module = sys.modules[mod_name]
315     return module.TestbedController(testbed_version)
316
317 class TestbedControllerServer(server.Server):
318     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
319         super(TestbedControllerServer, self).__init__(root_dir, log_level)
320         self._testbed_id = testbed_id
321         self._testbed_version = testbed_version
322         self._testbed = None
323
324     def post_daemonize(self):
325         self._testbed = _build_testbed_instance(self._testbed_id, 
326                 self._testbed_version)
327
328     def reply_action(self, msg):
329         if not msg:
330             result = base64.b64encode("Invalid command line")
331             reply = "%d|%s" % (ERROR, result)
332         else:
333             params = msg.split("|")
334             instruction = int(params[0])
335             log_msg(self, params)
336             try:
337                 if instruction == TRACE:
338                     reply = self.trace(params)
339                 elif instruction == START:
340                     reply = self.start(params)
341                 elif instruction == STOP:
342                     reply = self.stop(params)
343                 elif instruction == SHUTDOWN:
344                     reply = self.shutdown(params)
345                 elif instruction == CONFIGURE:
346                     reply = self.defer_configure(params)
347                 elif instruction == CREATE:
348                     reply = self.defer_create(params)
349                 elif instruction == CREATE_SET:
350                     reply = self.defer_create_set(params)
351                 elif instruction == FACTORY_SET:
352                     reply = self.defer_factory_set(params)
353                 elif instruction == CONNECT:
354                     reply = self.defer_connect(params)
355                 elif instruction == CROSS_CONNECT:
356                     reply = self.defer_cross_connect(params)
357                 elif instruction == ADD_TRACE:
358                     reply = self.defer_add_trace(params)
359                 elif instruction == ADD_ADDRESS:
360                     reply = self.defer_add_address(params)
361                 elif instruction == ADD_ROUTE:
362                     reply = self.defer_add_route(params)
363                 elif instruction == DO_SETUP:
364                     reply = self.do_setup(params)
365                 elif instruction == DO_CREATE:
366                     reply = self.do_create(params)
367                 elif instruction == DO_CONNECT:
368                     reply = self.do_connect(params)
369                 elif instruction == DO_CONFIGURE:
370                     reply = self.do_configure(params)
371                 elif instruction == DO_PRECONFIGURE:
372                     reply = self.do_preconfigure(params)
373                 elif instruction == DO_CROSS_CONNECT:
374                     reply = self.do_cross_connect(params)
375                 elif instruction == GET:
376                     reply = self.get(params)
377                 elif instruction == SET:
378                     reply = self.set(params)
379                 elif instruction == GET_ADDRESS:
380                     reply = self.get_address(params)
381                 elif instruction == GET_ROUTE:
382                     reply = self.get_route(params)
383                 elif instruction == ACTION:
384                     reply = self.action(params)
385                 elif instruction == STATUS:
386                     reply = self.status(params)
387                 elif instruction == GUIDS:
388                     reply = self.guids(params)
389                 else:
390                     error = "Invalid instruction %s" % instruction
391                     self.log_error(error)
392                     result = base64.b64encode(error)
393                     reply = "%d|%s" % (ERROR, result)
394             except:
395                 error = self.log_error()
396                 result = base64.b64encode(error)
397                 reply = "%d|%s" % (ERROR, result)
398         log_reply(self, reply)
399         return reply
400
401     def guids(self, params):
402         guids = self._testbed.guids
403         guids = ",".join(map(str, guids))
404         result = base64.b64encode(guids)
405         return "%d|%s" % (OK, result)
406
407     def defer_create(self, params):
408         guid = int(params[1])
409         factory_id = params[2]
410         self._testbed.defer_create(guid, factory_id)
411         return "%d|%s" % (OK, "")
412
413     def trace(self, params):
414         guid = int(params[1])
415         trace_id = params[2]
416         attribute = base64.b64decode(params[3])
417         trace = self._testbed.trace(guid, trace_id, attribute)
418         result = base64.b64encode(trace)
419         return "%d|%s" % (OK, result)
420
421     def start(self, params):
422         self._testbed.start()
423         return "%d|%s" % (OK, "")
424
425     def stop(self, params):
426         self._testbed.stop()
427         return "%d|%s" % (OK, "")
428
429     def shutdown(self, params):
430         self._testbed.shutdown()
431         return "%d|%s" % (OK, "")
432
433     def defer_configure(self, params):
434         name = base64.b64decode(params[1])
435         value = base64.b64decode(params[2])
436         type = int(params[3])
437         value = set_type(type, value)
438         self._testbed.defer_configure(name, value)
439         return "%d|%s" % (OK, "")
440
441     def defer_create_set(self, params):
442         guid = int(params[1])
443         name = base64.b64decode(params[2])
444         value = base64.b64decode(params[3])
445         type = int(params[4])
446         value = set_type(type, value)
447         self._testbed.defer_create_set(guid, name, value)
448         return "%d|%s" % (OK, "")
449
450     def defer_factory_set(self, params):
451         name = base64.b64decode(params[1])
452         value = base64.b64decode(params[2])
453         type = int(params[3])
454         value = set_type(type, value)
455         self._testbed.defer_factory_set(name, value)
456         return "%d|%s" % (OK, "")
457
458     def defer_connect(self, params):
459         guid1 = int(params[1])
460         connector_type_name1 = params[2]
461         guid2 = int(params[3])
462         connector_type_name2 = params[4]
463         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
464             connector_type_name2)
465         return "%d|%s" % (OK, "")
466
467     def defer_cross_connect(self, params):
468         guid = int(params[1])
469         connector_type_name = params[2]
470         cross_guid = int(params[3])
471         connector_type_name = params[4]
472         cross_guid = int(params[5])
473         cross_testbed_id = params[6]
474         cross_factory_id = params[7]
475         cross_connector_type_name = params[8]
476         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
477             cross_testbed_id, cross_factory_id, cross_connector_type_name)
478         return "%d|%s" % (OK, "")
479
480     def defer_add_trace(self, params):
481         guid = int(params[1])
482         trace_id = params[2]
483         self._testbed.defer_add_trace(guid, trace_id)
484         return "%d|%s" % (OK, "")
485
486     def defer_add_address(self, params):
487         guid = int(params[1])
488         address = params[2]
489         netprefix = int(params[3])
490         broadcast = params[4]
491         self._testbed.defer_add_address(guid, address, netprefix,
492                 broadcast)
493         return "%d|%s" % (OK, "")
494
495     def defer_add_route(self, params):
496         guid = int(params[1])
497         destination = params[2]
498         netprefix = int(params[3])
499         nexthop = params[4]
500         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
501         return "%d|%s" % (OK, "")
502
503     def do_setup(self, params):
504         self._testbed.do_setup()
505         return "%d|%s" % (OK, "")
506
507     def do_create(self, params):
508         self._testbed.do_create()
509         return "%d|%s" % (OK, "")
510
511     def do_connect(self, params):
512         self._testbed.do_connect()
513         return "%d|%s" % (OK, "")
514
515     def do_configure(self, params):
516         self._testbed.do_configure()
517         return "%d|%s" % (OK, "")
518
519     def do_preconfigure(self, params):
520         self._testbed.do_preconfigure()
521         return "%d|%s" % (OK, "")
522
523     def do_cross_connect(self, params):
524         self._testbed.do_cross_connect()
525         return "%d|%s" % (OK, "")
526
527     def get(self, params):
528         time = params[1]
529         guid = int(param[2] )
530         name = base64.b64decode(params[3])
531         value = self._testbed.get(time, guid, name)
532         result = base64.b64encode(str(value))
533         return "%d|%s" % (OK, result)
534
535     def set(self, params):
536         time = params[1]
537         guid = int(params[2])
538         name = base64.b64decode(params[3])
539         value = base64.b64decode(params[4])
540         type = int(params[3])
541         value = set_type(type, value)
542         self._testbed.set(time, guid, name, value)
543         return "%d|%s" % (OK, "")
544
545     def get_address(self, params):
546         guid = int(param[1])
547         index = int(param[2])
548         attribute = base64.b64decode(param[3])
549         value = self._testbed.get_address(guid, index, attribute)
550         result = base64.b64encode(str(value))
551         return "%d|%s" % (OK, result)
552
553     def get_route(self, params):
554         guid = int(param[1])
555         index = int(param[2])
556         attribute = base64.b64decode(param[3])
557         value = self._testbed.get_route(guid, index, attribute)
558         result = base64.b64encode(str(value))
559         return "%d|%s" % (OK, result)
560
561     def action(self, params):
562         time = params[1]
563         guid = int(params[2])
564         command = base64.b64decode(params[3])
565         self._testbed.action(time, guid, command)
566         return "%d|%s" % (OK, "")
567
568     def status(self, params):
569         guid = int(params[1])
570         status = self._testbed.status(guid)
571         result = base64.b64encode(str(status))
572         return "%d|%s" % (OK, result)
573  
574 class ExperimentControllerServer(server.Server):
575     def __init__(self, root_dir, log_level, experiment_xml):
576         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
577         self._experiment_xml = experiment_xml
578         self._controller = None
579
580     def post_daemonize(self):
581         from nepi.core.execute import ExperimentController
582         self._controller = ExperimentController(self._experiment_xml, 
583             root_dir = self._root_dir)
584
585     def reply_action(self, msg):
586         if not msg:
587             result = base64.b64encode("Invalid command line")
588             reply = "%d|%s" % (ERROR, result)
589         else:
590             params = msg.split("|")
591             instruction = int(params[0])
592             log_msg(self, params)
593             try:
594                 if instruction == XML:
595                     reply = self.experiment_xml(params)
596                 elif instruction == ACCESS:
597                     reply = self.set_access_configuration(params)
598                 elif instruction == TRACE:
599                     reply = self.trace(params)
600                 elif instruction == FINISHED:
601                     reply = self.is_finished(params)
602                 elif instruction == START:
603                     reply = self.start(params)
604                 elif instruction == STOP:
605                     reply = self.stop(params)
606                 elif instruction == RECOVER:
607                     reply = self.recover(params)
608                 elif instruction == SHUTDOWN:
609                     reply = self.shutdown(params)
610                 else:
611                     error = "Invalid instruction %s" % instruction
612                     self.log_error(error)
613                     result = base64.b64encode(error)
614                     reply = "%d|%s" % (ERROR, result)
615             except:
616                 error = self.log_error()
617                 result = base64.b64encode(error)
618                 reply = "%d|%s" % (ERROR, result)
619         log_reply(self, reply)
620         return reply
621
622     def experiment_xml(self, params):
623         xml = self._controller.experiment_xml
624         result = base64.b64encode(xml)
625         return "%d|%s" % (OK, result)
626
627     def set_access_configuration(self, params):
628         testbed_guid = int(params[1])
629         mode = params[2]
630         communication = params[3]
631         host = params[4]
632         user = params[5]
633         port = int(params[6])
634         root_dir = params[7]
635         use_agent = params[8] == "True"
636         log_level = params[9]
637         access_config = AccessConfiguration()
638         access_config.set_attribute_value("mode", mode)
639         access_config.set_attribute_value("communication", communication)
640         access_config.set_attribute_value("host", host)
641         access_config.set_attribute_value("user", user)
642         access_config.set_attribute_value("port", port)
643         access_config.set_attribute_value("rootDirectory", root_dir)
644         access_config.set_attribute_value("useAgent", use_agent)
645         access_config.set_attribute_value("logLevel", log_level)
646         self._controller.set_access_configuration(testbed_guid, 
647                 access_config)
648         return "%d|%s" % (OK, "")
649
650     def trace(self, params):
651         testbed_guid = int(params[1])
652         guid = int(params[2])
653         trace_id = params[3]
654         attribute = base64.b64decode(params[4])
655         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
656         result = base64.b64encode(trace)
657         return "%d|%s" % (OK, result)
658
659     def is_finished(self, params):
660         guid = int(params[1])
661         status = self._controller.is_finished(guid)
662         result = base64.b64encode(str(status))
663         return "%d|%s" % (OK, result)
664
665     def start(self, params):
666         self._controller.start()
667         return "%d|%s" % (OK, "")
668
669     def stop(self, params):
670         self._controller.stop()
671         return "%d|%s" % (OK, "")
672
673     def recover(self, params):
674         self._controller.recover()
675         return "%d|%s" % (OK, "")
676
677     def shutdown(self, params):
678         self._controller.shutdown()
679         return "%d|%s" % (OK, "")
680
681 class TestbedControllerProxy(object):
682     def __init__(self, root_dir, log_level, testbed_id = None, 
683             testbed_version = None, launch = True, host = None, 
684             port = None, user = None, agent = None):
685         if launch:
686             if testbed_id == None or testbed_version == None:
687                 raise RuntimeError("To launch a TesbedInstance server a \
688                         testbed_id and testbed_version are required")
689             # ssh
690             if host != None:
691                 python_code = "from nepi.util.proxy import \
692                         TesbedInstanceServer;\
693                         s = TestbedControllerServer('%s', %d, '%s', '%s');\
694                         s.run()" % (root_dir, log_level, testbed_id, 
695                                 testbed_version)
696                 proc = server.popen_ssh_subprocess(python_code, host = host,
697                     port = port, user = user, agent = agent)
698                 if proc.poll():
699                     err = proc.stderr.read()
700                     raise RuntimeError("Server could not be executed: %s" % \
701                             err)
702             else:
703                 # launch daemon
704                 s = TestbedControllerServer(root_dir, log_level, testbed_id, 
705                     testbed_version)
706                 s.run()
707
708         # connect client to server
709         self._client = server.Client(root_dir, host = host, port = port, 
710                 user = user, agent = agent)
711
712     @property
713     def guids(self):
714         msg = testbed_messages[GUIDS]
715         self._client.send_msg(msg)
716         reply = self._client.read_reply()
717         result = reply.split("|")
718         code = int(result[0])
719         text = base64.b64decode(result[1])
720         if code == ERROR:
721             raise RuntimeError(text)
722         return map(int, text.split(","))
723
724     def defer_configure(self, name, value):
725         msg = testbed_messages[CONFIGURE]
726         type = get_type(value)
727         # avoid having "|" in this parameters
728         name = base64.b64encode(name)
729         value = base64.b64encode(str(value))
730         msg = msg % (name, value, type)
731         self._client.send_msg(msg)
732         reply = self._client.read_reply()
733         result = reply.split("|")
734         code = int(result[0])
735         text = base64.b64decode(result[1])
736         if code == ERROR:
737             raise RuntimeError(text)
738
739     def defer_create(self, guid, factory_id):
740         msg = testbed_messages[CREATE]
741         msg = msg % (guid, factory_id)
742         self._client.send_msg(msg)
743         reply = self._client.read_reply()
744         result = reply.split("|")
745         code = int(result[0])
746         text = base64.b64decode(result[1])
747         if code == ERROR:
748             raise RuntimeError(text)
749
750     def defer_create_set(self, guid, name, value):
751         msg = testbed_messages[CREATE_SET]
752         type = get_type(value)
753         # avoid having "|" in this parameters
754         name = base64.b64encode(name)
755         value = base64.b64encode(str(value))
756         msg = msg % (guid, name, value, type)
757         self._client.send_msg(msg)
758         reply = self._client.read_reply()
759         result = reply.split("|")
760         code = int(result[0])
761         text = base64.b64decode(result[1])
762         if code == ERROR:
763             raise RuntimeError(text)
764
765     def defer_factory_set(self, guid, name, value):
766         msg = testbed_messages[FACTORY_SET]
767         type = get_type(value)
768         # avoid having "|" in this parameters
769         name = base64.b64encode(name)
770         value = base64.b64encode(str(value))
771         msg = msg % (guid, name, value, type)
772         self._client.send_msg(msg)
773         reply = self._client.read_reply()
774         result = reply.split("|")
775         code = int(result[0])
776         text = base64.b64decode(result[1])
777         if code == ERROR:
778             raise RuntimeError(text)
779
780     def defer_connect(self, guid1, connector_type_name1, guid2, 
781             connector_type_name2): 
782         msg = testbed_messages[CONNECT]
783         msg = msg % (guid1, connector_type_name1, guid2, 
784             connector_type_name2)
785         self._client.send_msg(msg)
786         reply = self._client.read_reply()
787         result = reply.split("|")
788         code = int(result[0])
789         text = base64.b64decode(result[1])
790         if code == ERROR:
791             raise RuntimeError(text)
792
793     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
794             cross_testbed_id, cross_factory_id, cross_connector_type_name):
795         msg = testbed_messages[CROSS_CONNECT]
796         msg = msg % (guid, connector_type_name, cross_guid, 
797             cross_testbed_id, cross_factory_id, cross_connector_type_name)
798         self._client.send_msg(msg)
799         reply = self._client.read_reply()
800         result = reply.split("|")
801         code = int(result[0])
802         text = base64.b64decode(result[1])
803         if code == ERROR:
804             raise RuntimeError(text)
805
806     def defer_add_trace(self, guid, trace_id):
807         msg = testbed_messages[ADD_TRACE]
808         msg = msg % (guid, trace_id)
809         self._client.send_msg(msg)
810         reply = self._client.read_reply()
811         result = reply.split("|")
812         code = int(result[0])
813         text = base64.b64decode(result[1])
814         if code == ERROR:
815             raise RuntimeError(text)
816
817     def defer_add_address(self, guid, address, netprefix, broadcast): 
818         msg = testbed_messages[ADD_ADDRESS]
819         msg = msg % (guid, address, netprefix, broadcast)
820         self._client.send_msg(msg)
821         reply = self._client.read_reply()
822         result = reply.split("|")
823         code = int(result[0])
824         text = base64.b64decode(result[1])
825         if code == ERROR:
826             raise RuntimeError(text)
827
828     def defer_add_route(self, guid, destination, netprefix, nexthop):
829         msg = testbed_messages[ADD_ROUTE]
830         msg = msg % (guid, destination, netprefix, nexthop)
831         self._client.send_msg(msg)
832         reply = self._client.read_reply()
833         result = reply.split("|")
834         code = int(result[0])
835         text = base64.b64decode(result[1])
836         if code == ERROR:
837             raise RuntimeError(text)
838
839     def do_setup(self):
840         msg = testbed_messages[DO_SETUP]
841         self._client.send_msg(msg)
842         reply = self._client.read_reply()
843         result = reply.split("|")
844         code = int(result[0])
845         text = base64.b64decode(result[1])
846         if code == ERROR:
847             raise RuntimeError(text)
848
849     def do_create(self):
850         msg = testbed_messages[DO_CREATE]
851         self._client.send_msg(msg)
852         reply = self._client.read_reply()
853         result = reply.split("|")
854         code = int(result[0])
855         text = base64.b64decode(result[1])
856         if code == ERROR:
857             raise RuntimeError(text)
858
859     def do_connect(self):
860         msg = testbed_messages[DO_CONNECT]
861         self._client.send_msg(msg)
862         reply = self._client.read_reply()
863         result = reply.split("|")
864         code = int(result[0])
865         text = base64.b64decode(result[1])
866         if code == ERROR:
867             raise RuntimeError(text)
868
869     def do_configure(self):
870         msg = testbed_messages[DO_CONFIGURE]
871         self._client.send_msg(msg)
872         reply = self._client.read_reply()
873         result = reply.split("|")
874         code = int(result[0])
875         text = base64.b64decode(result[1])
876         if code == ERROR:
877             raise RuntimeError(text)
878
879     def do_preconfigure(self):
880         msg = testbed_messages[DO_PRECONFIGURE]
881         self._client.send_msg(msg)
882         reply = self._client.read_reply()
883         result = reply.split("|")
884         code = int(result[0])
885         text = base64.b64decode(result[1])
886         if code == ERROR:
887             raise RuntimeError(text)
888
889     def do_cross_connect(self):
890         msg = testbed_messages[DO_CROSS_CONNECT]
891         self._client.send_msg(msg)
892         reply = self._client.read_reply()
893         result = reply.split("|")
894         code = int(result[0])
895         text = base64.b64decode(result[1])
896         if code == ERROR:
897             raise RuntimeError(text)
898
899     def start(self, time = TIME_NOW):
900         msg = testbed_messages[START]
901         self._client.send_msg(msg)
902         reply = self._client.read_reply()
903         result = reply.split("|")
904         code = int(result[0])
905         text = base64.b64decode(result[1])
906         if code == ERROR:
907             raise RuntimeError(text)
908
909     def stop(self, time = TIME_NOW):
910         msg = testbed_messages[STOP]
911         self._client.send_msg(msg)
912         reply = self._client.read_reply()
913         result = reply.split("|")
914         code = int(result[0])
915         text = base64.b64decode(result[1])
916         if code == ERROR:
917             raise RuntimeError(text)
918
919     def set(self, time, guid, name, value):
920         msg = testbed_messages[SET]
921         type = get_type(value)
922         # avoid having "|" in this parameters
923         name = base64.b64encode(name)
924         value = base64.b64encode(str(value))
925         msg = msg % (time, guid, name, value, type)
926         self._client.send_msg(msg)
927         reply = self._client.read_reply()
928         result = reply.split("|")
929         code = int(result[0])
930         text = base64.b64decode(result[1])
931         if code == ERROR:
932             raise RuntimeError(text)
933
934     def get(self, time, guid, name):
935         msg = testbed_messages[GET]
936         # avoid having "|" in this parameters
937         name = base64.b64encode(name)
938         msg = msg % (time, guid, name)
939         self._client.send_msg(msg)
940         reply = self._client.read_reply()
941         result = reply.split("|")
942         code = int(result[0])
943         text = base64.b64decode(result[1])
944         if code == ERROR:
945             raise RuntimeError(text)
946         return text
947
948     def get_address(self, guid, index, attribute):
949         msg = testbed_messages[GET_ADDRESS]
950         # avoid having "|" in this parameters
951         attribute = base64.b64encode(attribute)
952         msg = msg % (guid, index, attribute)
953         self._client.send_msg(msg)
954         reply = self._client.read_reply()
955         result = reply.split("|")
956         code = int(result[0])
957         text = base64.b64decode(result[1])
958         if code == ERROR:
959             raise RuntimeError(text)
960         return text
961
962     def get_route(self, guid, index, attribute):
963         msg = testbed_messages[GET_ROUTE]
964         # avoid having "|" in this parameters
965         attribute = base64.b64encode(attribute)
966         msg = msg % (guid, index, attribute)
967         self._client.send_msg(msg)
968         reply = self._client.read_reply()
969         result = reply.split("|")
970         code = int(result[0])
971         text = base64.b64decode(result[1])
972         if code == ERROR:
973             raise RuntimeError(text)
974         return text
975
976     def action(self, time, guid, action):
977         msg = testbed_messages[ACTION]
978         msg = msg % (time, guid, action)
979         self._client.send_msg(msg)
980         reply = self._client.read_reply()
981         result = reply.split("|")
982         code = int(result[0])
983         text = base64.b64decode(result[1])
984         if code == ERROR:
985             raise RuntimeError(text)
986
987     def status(self, guid):
988         msg = testbed_messages[STATUS]
989         msg = msg % (guid)
990         self._client.send_msg(msg)
991         reply = self._client.read_reply()
992         result = reply.split("|")
993         code = int(result[0])
994         text = base64.b64decode(result[1])
995         if code == ERROR:
996             raise RuntimeError(text)
997         return int(text)
998
999     def trace(self, guid, trace_id, attribute='value'):
1000         msg = testbed_messages[TRACE]
1001         attribute = base64.b64encode(attribute)
1002         msg = msg % (guid, trace_id, attribute)
1003         self._client.send_msg(msg)
1004         reply = self._client.read_reply()
1005         result = reply.split("|")
1006         code = int(result[0])
1007         text = base64.b64decode(result[1])
1008         if code == ERROR:
1009             raise RuntimeError(text)
1010         return text
1011
1012     def shutdown(self):
1013         msg = testbed_messages[SHUTDOWN]
1014         self._client.send_msg(msg)
1015         reply = self._client.read_reply()
1016         result = reply.split("|")
1017         code = int(result[0])
1018         text = base64.b64decode(result[1])
1019         if code == ERROR:
1020             raise RuntimeError(text)
1021         self._client.send_stop()
1022         self._client.read_reply() # wait for it
1023
1024 class ExperimentControllerProxy(object):
1025     def __init__(self, root_dir, log_level, experiment_xml = None, 
1026             launch = True, host = None, port = None, user = None, 
1027             agent = None):
1028         if launch:
1029             # launch server
1030             if experiment_xml == None:
1031                 raise RuntimeError("To launch a ExperimentControllerServer a \
1032                         xml description of the experiment is required")
1033             # ssh
1034             if host != None:
1035                 xml = experiment_xml
1036                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1037                         s = ExperimentControllerServer(%r, %r, %r);\
1038                         s.run()" % (root_dir, log_level, xml)
1039                 proc = server.popen_ssh_subprocess(python_code, host = host,
1040                     port = port, user = user, agent = agent)
1041                 if proc.poll():
1042                     err = proc.stderr.read()
1043                     raise RuntimeError("Server could not be executed: %s" % \
1044                             err)
1045             else:
1046                 # launch daemon
1047                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1048                 s.run()
1049
1050         # connect client to server
1051         self._client = server.Client(root_dir, host = host, port = port, 
1052                 user = user, agent = agent)
1053
1054     @property
1055     def experiment_xml(self):
1056         msg = controller_messages[XML]
1057         self._client.send_msg(msg)
1058         reply = self._client.read_reply()
1059         result = reply.split("|")
1060         code = int(result[0])
1061         text = base64.b64decode(result[1])
1062         if code == ERROR:
1063             raise RuntimeError(text)
1064         return text
1065
1066     def set_access_configuration(self, testbed_guid, access_config):
1067         mode = access_config.get_attribute_value("mode")
1068         communication = access_config.get_attribute_value("communication")
1069         host = access_config.get_attribute_value("host")
1070         user = access_config.get_attribute_value("user")
1071         port = access_config.get_attribute_value("port")
1072         root_dir = access_config.get_attribute_value("rootDirectory")
1073         use_agent = access_config.get_attribute_value("useAgent")
1074         log_level = access_config.get_attribute_value("logLevel")
1075         msg = controller_messages[ACCESS]
1076         msg = msg % (testbed_guid, mode, communication, host, user, port, 
1077                 root_dir, use_agent, log_level)
1078         self._client.send_msg(msg)
1079         reply = self._client.read_reply()
1080         result = reply.split("|")
1081         code = int(result[0])
1082         text =  base64.b64decode(result[1])
1083         if code == ERROR:
1084             raise RuntimeError(text)
1085
1086     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1087         msg = controller_messages[TRACE]
1088         attribute = base64.b64encode(attribute)
1089         msg = msg % (testbed_guid, guid, trace_id, attribute)
1090         self._client.send_msg(msg)
1091         reply = self._client.read_reply()
1092         result = reply.split("|")
1093         code = int(result[0])
1094         text =  base64.b64decode(result[1])
1095         if code == OK:
1096             return text
1097         raise RuntimeError(text)
1098
1099     def start(self):
1100         msg = controller_messages[START]
1101         self._client.send_msg(msg)
1102         reply = self._client.read_reply()
1103         result = reply.split("|")
1104         code = int(result[0])
1105         text =  base64.b64decode(result[1])
1106         if code == ERROR:
1107             raise RuntimeError(text)
1108
1109     def stop(self):
1110         msg = controller_messages[STOP]
1111         self._client.send_msg(msg)
1112         reply = self._client.read_reply()
1113         result = reply.split("|")
1114         code = int(result[0])
1115         text =  base64.b64decode(result[1])
1116         if code == ERROR:
1117             raise RuntimeError(text)
1118
1119     def recover(self):
1120         msg = controller_messages[RECOVER]
1121         self._client.send_msg(msg)
1122         reply = self._client.read_reply()
1123         result = reply.split("|")
1124         code = int(result[0])
1125         text =  base64.b64decode(result[1])
1126         if code == ERROR:
1127             raise RuntimeError(text)
1128
1129     def is_finished(self, guid):
1130         msg = controller_messages[FINISHED]
1131         msg = msg % guid
1132         self._client.send_msg(msg)
1133         reply = self._client.read_reply()
1134         result = reply.split("|")
1135         code = int(result[0])
1136         text = base64.b64decode(result[1])
1137         if code == ERROR:
1138             raise RuntimeError(text)
1139         return text == "True"
1140
1141     def shutdown(self):
1142         msg = controller_messages[SHUTDOWN]
1143         self._client.send_msg(msg)
1144         reply = self._client.read_reply()
1145         result = reply.split("|")
1146         code = int(result[0])
1147         text =  base64.b64decode(result[1])
1148         if code == ERROR:
1149             raise RuntimeError(text)
1150         self._client.send_stop()
1151         self._client.read_reply() # wait for it
1152