Ticket #25: controller recovery mode
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import sys
10 import time
11 import tempfile
12 import shutil
13
14 # PROTOCOL REPLIES
15 OK = 0
16 ERROR = 1
17
18 # PROTOCOL INSTRUCTION MESSAGES
19 XML = 2 
20 ACCESS  = 3
21 TRACE   = 4
22 FINISHED    = 5
23 START   = 6
24 STOP    = 7
25 SHUTDOWN    = 8
26 CONFIGURE   = 9
27 CREATE      = 10
28 CREATE_SET  = 11
29 FACTORY_SET = 12
30 CONNECT     = 13
31 CROSS_CONNECT   = 14
32 ADD_TRACE   = 15
33 ADD_ADDRESS = 16
34 ADD_ROUTE   = 17
35 DO_SETUP    = 18
36 DO_CREATE   = 19
37 DO_CONNECT  = 20
38 DO_CONFIGURE    = 21
39 DO_CROSS_CONNECT    = 22
40 GET = 23
41 SET = 24
42 ACTION  = 25
43 STATUS  = 26
44 GUIDS  = 27
45 GET_ROUTE = 28
46 GET_ADDRESS = 29
47 RECOVER = 30
48
49 # PARAMETER TYPE
50 STRING  =  100
51 INTEGER = 101
52 BOOL    = 102
53 FLOAT   = 103
54
55 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
56 controller_messages = dict({
57     XML:    "%d" % XML,
58     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
59     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
60     FINISHED:   "%d|%s" % (FINISHED, "%d"),
61     START:  "%d" % START,
62     STOP:   "%d" % STOP,
63     RECOVER : "%d" % RECOVER,
64     SHUTDOWN:   "%d" % SHUTDOWN,
65     })
66
67 # TESTBED INSTANCE PROTOCOL MESSAGES
68 testbed_messages = dict({
69     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
70     START:  "%d" % START,
71     STOP:   "%d" % STOP,
72     SHUTDOWN:   "%d" % SHUTDOWN,
73     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
74     CREATE: "%d|%s" % (CREATE, "%d|%s"),
75     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
76     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
77     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
78     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
79     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
80     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
81     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
82     DO_SETUP:   "%d" % DO_SETUP,
83     DO_CREATE:  "%d" % DO_CREATE,
84     DO_CONNECT: "%d" % DO_CONNECT,
85     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
86     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
87     GET:    "%d|%s" % (GET, "%s|%d|%s"),
88     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
89     GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
90     GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
91     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
92     STATUS: "%d|%s" % (STATUS, "%d"),
93     GUIDS:  "%d" % GUIDS,
94     })
95
96 instruction_text = dict({
97     OK:     "OK",
98     ERROR:  "ERROR",
99     XML:    "XML",
100     ACCESS: "ACCESS",
101     TRACE:  "TRACE",
102     FINISHED:   "FINISHED",
103     START:  "START",
104     STOP:   "STOP",
105     RECOVER: "RECOVER",
106     SHUTDOWN:   "SHUTDOWN",
107     CONFIGURE:  "CONFIGURE",
108     CREATE: "CREATE",
109     CREATE_SET: "CREATE_SET",
110     FACTORY_SET:    "FACTORY_SET",
111     CONNECT:    "CONNECT",
112     CROSS_CONNECT: "CROSS_CONNECT",
113     ADD_TRACE:  "ADD_TRACE",
114     ADD_ADDRESS:    "ADD_ADDRESS",
115     ADD_ROUTE:  "ADD_ROUTE",
116     DO_SETUP:   "DO_SETUP",
117     DO_CREATE:  "DO_CREATE",
118     DO_CONNECT: "DO_CONNECT",
119     DO_CONFIGURE:   "DO_CONFIGURE",
120     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
121     GET:    "GET",
122     SET:    "SET",
123     GET_ROUTE: "GET_ROUTE",
124     GET_ADDRESS: "GET_ADDRESS",
125     ACTION: "ACTION",
126     STATUS: "STATUS",
127     GUIDS:  "GUIDS",
128     STRING: "STRING",
129     INTEGER:    "INTEGER",
130     BOOL:   "BOOL",
131     FLOAT:  "FLOAT"
132     })
133
134 def get_type(value):
135     if isinstance(value, bool):
136         return BOOL
137     elif isinstance(value, int):
138         return INTEGER
139     elif isinstance(value, float):
140         return FLOAT
141     else:
142         return STRING
143
144 def set_type(type, value):
145     if type == INTEGER:
146         value = int(value)
147     elif type == FLOAT:
148         value = float(value)
149     elif type == BOOL:
150         value = value == "True"
151     else:
152         value = str(value)
153     return value
154
155 def log_msg(server, params):
156     instr = int(params[0])
157     instr_txt = instruction_text[instr]
158     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
159         instr_txt, ", ".join(map(str, params[1:]))))
160
161 def log_reply(server, reply):
162     res = reply.split("|")
163     code = int(res[0])
164     code_txt = instruction_text[code]
165     txt = base64.b64decode(res[1])
166     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
167             code_txt, txt))
168
169 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
170     if python_code:
171         # launch daemon
172         proc = server.popen_ssh_subprocess(python_code, host = host,
173             port = port, user = user, agent = agent)
174         if proc.poll():
175             err = proc.stderr.read()
176             raise RuntimeError("Client could not be executed: %s" % \
177                     err)
178     # create client
179     return server.Client(root_dir, host = host, port = port, user = user, 
180             agent = agent)
181
182 def to_server_log_level(log_level):
183     return server.DEBUG_LEVEL \
184             if log_level == AccessConfiguration.DEBUG_LEVEL \
185                 else server.ERROR_LEVEL
186
187 def get_access_config_params(access_config):
188     root_dir = access_config.get_attribute_value("rootDirectory")
189     log_level = access_config.get_attribute_value("logLevel")
190     log_level = to_server_log_level(log_level)
191     user = host = port = agent = None
192     communication = access_config.get_attribute_value("communication")
193     if communication == AccessConfiguration.ACCESS_SSH:
194         user = access_config.get_attribute_value("user")
195         host = access_config.get_attribute_value("host")
196         port = access_config.get_attribute_value("port")
197         agent = access_config.get_attribute_value("useAgent")
198     return (root_dir, log_level, user, host, port, agent)
199
200 class AccessConfiguration(AttributesMap):
201     MODE_SINGLE_PROCESS = "SINGLE"
202     MODE_DAEMON = "DAEMON"
203     ACCESS_SSH = "SSH"
204     ACCESS_LOCAL = "LOCAL"
205     ERROR_LEVEL = "Error"
206     DEBUG_LEVEL = "Debug"
207
208     def __init__(self):
209         super(AccessConfiguration, self).__init__()
210         self.add_attribute(name = "mode",
211                 help = "Instance execution mode",
212                 type = Attribute.ENUM,
213                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
214                 allowed = [AccessConfiguration.MODE_DAEMON,
215                     AccessConfiguration.MODE_SINGLE_PROCESS],
216                 validation_function = validation.is_enum)
217         self.add_attribute(name = "communication",
218                 help = "Instance communication mode",
219                 type = Attribute.ENUM,
220                 value = AccessConfiguration.ACCESS_LOCAL,
221                 allowed = [AccessConfiguration.ACCESS_LOCAL,
222                     AccessConfiguration.ACCESS_SSH],
223                 validation_function = validation.is_enum)
224         self.add_attribute(name = "host",
225                 help = "Host where the testbed will be executed",
226                 type = Attribute.STRING,
227                 value = "localhost",
228                 validation_function = validation.is_string)
229         self.add_attribute(name = "user",
230                 help = "User on the Host to execute the testbed",
231                 type = Attribute.STRING,
232                 value = getpass.getuser(),
233                 validation_function = validation.is_string)
234         self.add_attribute(name = "port",
235                 help = "Port on the Host",
236                 type = Attribute.INTEGER,
237                 value = 22,
238                 validation_function = validation.is_integer)
239         self.add_attribute(name = "rootDirectory",
240                 help = "Root directory for storing process files",
241                 type = Attribute.STRING,
242                 value = ".",
243                 validation_function = validation.is_string) # TODO: validation.is_path
244         self.add_attribute(name = "useAgent",
245                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
246                 type = Attribute.BOOL,
247                 value = False,
248                 validation_function = validation.is_bool)
249         self.add_attribute(name = "logLevel",
250                 help = "Log level for instance",
251                 type = Attribute.ENUM,
252                 value = AccessConfiguration.ERROR_LEVEL,
253                 allowed = [AccessConfiguration.ERROR_LEVEL,
254                     AccessConfiguration.DEBUG_LEVEL],
255                 validation_function = validation.is_enum)
256         self.add_attribute(name = "recover",
257                 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
258                 type = Attribute.BOOL,
259                 value = False,
260                 validation_function = validation.is_bool)
261
262 class TempDir(object):
263     def __init__(self):
264         self.path = tempfile.mkdtemp()
265     
266     def __del__(self):
267         shutil.rmtree(self.path)
268
269 class PermDir(object):
270     def __init__(self, path):
271         self.path = path
272
273 def create_controller(xml, access_config = None):
274     mode = None if not access_config \
275             else access_config.get_attribute_value("mode")
276     launch = True if not access_config \
277             else not access_config.get_attribute_value("recover")
278     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
279         if not launch:
280             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
281         
282         from nepi.core.execute import ExperimentController
283         
284         if not access_config or not access_config.has_attribute("rootDirectory"):
285             root_dir = TempDir()
286         else:
287             root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
288         controller = ExperimentController(xml, root_dir.path)
289         
290         # inject reference to temporary dir, so that it gets cleaned
291         # up at destruction time.
292         controller._tempdir = root_dir
293         
294         return controller
295     elif mode == AccessConfiguration.MODE_DAEMON:
296         (root_dir, log_level, user, host, port, agent) = \
297                 get_access_config_params(access_config)
298         return ExperimentControllerProxy(root_dir, log_level,
299                 experiment_xml = xml, host = host, port = port, user = user, 
300                 agent = agent, launch = launch)
301     raise RuntimeError("Unsupported access configuration '%s'" % mode)
302
303 def create_testbed_instance(testbed_id, testbed_version, access_config):
304     mode = None if not access_config \
305             else access_config.get_attribute_value("mode")
306     launch = True if not access_config \
307             else not access_config.get_attribute_value("recover")
308     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
309         if not launch:
310             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
311         return  _build_testbed_instance(testbed_id, testbed_version)
312     elif mode == AccessConfiguration.MODE_DAEMON:
313         (root_dir, log_level, user, host, port, agent) = \
314                 get_access_config_params(access_config)
315         return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id, 
316                 testbed_version = testbed_version, host = host, port = port,
317                 user = user, agent = agent, launch = launch)
318     raise RuntimeError("Unsupported access configuration '%s'" % mode)
319
320 def _build_testbed_instance(testbed_id, testbed_version):
321     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
322     if not mod_name in sys.modules:
323         __import__(mod_name)
324     module = sys.modules[mod_name]
325     return module.TestbedInstance(testbed_version)
326
327 class TestbedInstanceServer(server.Server):
328     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
329         super(TestbedInstanceServer, self).__init__(root_dir, log_level)
330         self._testbed_id = testbed_id
331         self._testbed_version = testbed_version
332         self._testbed = None
333
334     def post_daemonize(self):
335         self._testbed = _build_testbed_instance(self._testbed_id, 
336                 self._testbed_version)
337
338     def reply_action(self, msg):
339         params = msg.split("|")
340         instruction = int(params[0])
341         log_msg(self, params)
342         try:
343             if instruction == TRACE:
344                 reply = self.trace(params)
345             elif instruction == START:
346                 reply = self.start(params)
347             elif instruction == STOP:
348                 reply = self.stop(params)
349             elif instruction == SHUTDOWN:
350                 reply = self.shutdown(params)
351             elif instruction == CONFIGURE:
352                 reply = self.defer_configure(params)
353             elif instruction == CREATE:
354                 reply = self.defer_create(params)
355             elif instruction == CREATE_SET:
356                 reply = self.defer_create_set(params)
357             elif instruction == FACTORY_SET:
358                 reply = self.defer_factory_set(params)
359             elif instruction == CONNECT:
360                 reply = self.defer_connect(params)
361             elif instruction == CROSS_CONNECT:
362                 reply = self.defer_cross_connect(params)
363             elif instruction == ADD_TRACE:
364                 reply = self.defer_add_trace(params)
365             elif instruction == ADD_ADDRESS:
366                 reply = self.defer_add_address(params)
367             elif instruction == ADD_ROUTE:
368                 reply = self.defer_add_route(params)
369             elif instruction == DO_SETUP:
370                 reply = self.do_setup(params)
371             elif instruction == DO_CREATE:
372                 reply = self.do_create(params)
373             elif instruction == DO_CONNECT:
374                 reply = self.do_connect(params)
375             elif instruction == DO_CONFIGURE:
376                 reply = self.do_configure(params)
377             elif instruction == DO_CROSS_CONNECT:
378                 reply = self.do_cross_connect(params)
379             elif instruction == GET:
380                 reply = self.get(params)
381             elif instruction == SET:
382                 reply = self.set(params)
383             elif instruction == GET_ADDRESS:
384                 reply = self.get_address(params)
385             elif instruction == GET_ROUTE:
386                 reply = self.get_route(params)
387             elif instruction == ACTION:
388                 reply = self.action(params)
389             elif instruction == STATUS:
390                 reply = self.status(params)
391             elif instruction == GUIDS:
392                 reply = self.guids(params)
393             else:
394                 error = "Invalid instruction %s" % instruction
395                 self.log_error(error)
396                 result = base64.b64encode(error)
397                 reply = "%d|%s" % (ERROR, result)
398         except:
399             error = self.log_error()
400             result = base64.b64encode(error)
401             reply = "%d|%s" % (ERROR, result)
402         log_reply(self, reply)
403         return reply
404
405     def guids(self, params):
406         guids = self._testbed.guids
407         guids = ",".join(map(str, guids))
408         result = base64.b64encode(guids)
409         return "%d|%s" % (OK, result)
410
411     def defer_create(self, params):
412         guid = int(params[1])
413         factory_id = params[2]
414         self._testbed.defer_create(guid, factory_id)
415         return "%d|%s" % (OK, "")
416
417     def trace(self, params):
418         guid = int(params[1])
419         trace_id = params[2]
420         attribute = base64.b64decode(params[3])
421         trace = self._testbed.trace(guid, trace_id, attribute)
422         result = base64.b64encode(trace)
423         return "%d|%s" % (OK, result)
424
425     def start(self, params):
426         self._testbed.start()
427         return "%d|%s" % (OK, "")
428
429     def stop(self, params):
430         self._testbed.stop()
431         return "%d|%s" % (OK, "")
432
433     def shutdown(self, params):
434         self._testbed.shutdown()
435         return "%d|%s" % (OK, "")
436
437     def defer_configure(self, params):
438         name = base64.b64decode(params[1])
439         value = base64.b64decode(params[2])
440         type = int(params[3])
441         value = set_type(type, value)
442         self._testbed.defer_configure(name, value)
443         return "%d|%s" % (OK, "")
444
445     def defer_create_set(self, params):
446         guid = int(params[1])
447         name = base64.b64decode(params[2])
448         value = base64.b64decode(params[3])
449         type = int(params[4])
450         value = set_type(type, value)
451         self._testbed.defer_create_set(guid, name, value)
452         return "%d|%s" % (OK, "")
453
454     def defer_factory_set(self, params):
455         name = base64.b64decode(params[1])
456         value = base64.b64decode(params[2])
457         type = int(params[3])
458         value = set_type(type, value)
459         self._testbed.defer_factory_set(name, value)
460         return "%d|%s" % (OK, "")
461
462     def defer_connect(self, params):
463         guid1 = int(params[1])
464         connector_type_name1 = params[2]
465         guid2 = int(params[3])
466         connector_type_name2 = params[4]
467         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
468             connector_type_name2)
469         return "%d|%s" % (OK, "")
470
471     def defer_cross_connect(self, params):
472         guid = int(params[1])
473         connector_type_name = params[2]
474         cross_guid = int(params[3])
475         connector_type_name = params[4]
476         cross_guid = int(params[5])
477         cross_testbed_id = params[6]
478         cross_factory_id = params[7]
479         cross_connector_type_name = params[8]
480         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
481             cross_testbed_id, cross_factory_id, cross_connector_type_name)
482         return "%d|%s" % (OK, "")
483
484     def defer_add_trace(self, params):
485         guid = int(params[1])
486         trace_id = params[2]
487         self._testbed.defer_add_trace(guid, trace_id)
488         return "%d|%s" % (OK, "")
489
490     def defer_add_address(self, params):
491         guid = int(params[1])
492         address = params[2]
493         netprefix = int(params[3])
494         broadcast = params[4]
495         self._testbed.defer_add_address(guid, address, netprefix,
496                 broadcast)
497         return "%d|%s" % (OK, "")
498
499     def defer_add_route(self, params):
500         guid = int(params[1])
501         destination = params[2]
502         netprefix = int(params[3])
503         nexthop = params[4]
504         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
505         return "%d|%s" % (OK, "")
506
507     def do_setup(self, params):
508         self._testbed.do_setup()
509         return "%d|%s" % (OK, "")
510
511     def do_create(self, params):
512         self._testbed.do_create()
513         return "%d|%s" % (OK, "")
514
515     def do_connect(self, params):
516         self._testbed.do_connect()
517         return "%d|%s" % (OK, "")
518
519     def do_configure(self, params):
520         self._testbed.do_configure()
521         return "%d|%s" % (OK, "")
522
523     def do_cross_connect(self, params):
524         self._testbed.do_cross_connect()
525         return "%d|%s" % (OK, "")
526
527     def get(self, params):
528         time = params[1]
529         guid = int(param[2] )
530         name = base64.b64decode(params[3])
531         value = self._testbed.get(time, guid, name)
532         result = base64.b64encode(str(value))
533         return "%d|%s" % (OK, result)
534
535     def set(self, params):
536         time = params[1]
537         guid = int(params[2])
538         name = base64.b64decode(params[3])
539         value = base64.b64decode(params[4])
540         type = int(params[3])
541         value = set_type(type, value)
542         self._testbed.set(time, guid, name, value)
543         return "%d|%s" % (OK, "")
544
545     def get_address(self, params):
546         guid = int(param[1])
547         index = int(param[2])
548         attribute = base64.b64decode(param[3])
549         value = self._testbed.get_address(guid, index, attribute)
550         result = base64.b64encode(str(value))
551         return "%d|%s" % (OK, result)
552
553     def get_route(self, params):
554         guid = int(param[1])
555         index = int(param[2])
556         attribute = base64.b64decode(param[3])
557         value = self._testbed.get_route(guid, index, attribute)
558         result = base64.b64encode(str(value))
559         return "%d|%s" % (OK, result)
560
561     def action(self, params):
562         time = params[1]
563         guid = int(params[2])
564         command = base64.b64decode(params[3])
565         self._testbed.action(time, guid, command)
566         return "%d|%s" % (OK, "")
567
568     def status(self, params):
569         guid = int(params[1])
570         status = self._testbed.status(guid)
571         result = base64.b64encode(str(status))
572         return "%d|%s" % (OK, result)
573  
574 class ExperimentControllerServer(server.Server):
575     def __init__(self, root_dir, log_level, experiment_xml):
576         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
577         self._experiment_xml = experiment_xml
578         self._controller = None
579
580     def post_daemonize(self):
581         from nepi.core.execute import ExperimentController
582         self._controller = ExperimentController(self._experiment_xml, 
583             root_dir = self._root_dir)
584
585     def reply_action(self, msg):
586         if not msg:
587             result = base64.b64encode("Invalid command line")
588             reply = "%d|%s" % (ERROR, result)
589         else:
590             params = msg.split("|")
591             instruction = int(params[0])
592             log_msg(self, params)
593             try:
594                 if instruction == XML:
595                     reply = self.experiment_xml(params)
596                 elif instruction == ACCESS:
597                     reply = self.set_access_configuration(params)
598                 elif instruction == TRACE:
599                     reply = self.trace(params)
600                 elif instruction == FINISHED:
601                     reply = self.is_finished(params)
602                 elif instruction == START:
603                     reply = self.start(params)
604                 elif instruction == STOP:
605                     reply = self.stop(params)
606                 elif instruction == RECOVER:
607                     reply = self.recover(params)
608                 elif instruction == SHUTDOWN:
609                     reply = self.shutdown(params)
610                 else:
611                     error = "Invalid instruction %s" % instruction
612                     self.log_error(error)
613                     result = base64.b64encode(error)
614                     reply = "%d|%s" % (ERROR, result)
615             except:
616                 error = self.log_error()
617                 result = base64.b64encode(error)
618                 reply = "%d|%s" % (ERROR, result)
619         log_reply(self, reply)
620         return reply
621
622     def experiment_xml(self, params):
623         xml = self._controller.experiment_xml
624         result = base64.b64encode(xml)
625         return "%d|%s" % (OK, result)
626
627     def set_access_configuration(self, params):
628         testbed_guid = int(params[1])
629         mode = params[2]
630         communication = params[3]
631         host = params[4]
632         user = params[5]
633         port = int(params[6])
634         root_dir = params[7]
635         use_agent = params[8] == "True"
636         log_level = params[9]
637         access_config = AccessConfiguration()
638         access_config.set_attribute_value("mode", mode)
639         access_config.set_attribute_value("communication", communication)
640         access_config.set_attribute_value("host", host)
641         access_config.set_attribute_value("user", user)
642         access_config.set_attribute_value("port", port)
643         access_config.set_attribute_value("rootDirectory", root_dir)
644         access_config.set_attribute_value("useAgent", use_agent)
645         access_config.set_attribute_value("logLevel", log_level)
646         self._controller.set_access_configuration(testbed_guid, 
647                 access_config)
648         return "%d|%s" % (OK, "")
649
650     def trace(self, params):
651         testbed_guid = int(params[1])
652         guid = int(params[2])
653         trace_id = params[3]
654         attribute = base64.b64decode(params[4])
655         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
656         result = base64.b64encode(trace)
657         return "%d|%s" % (OK, result)
658
659     def is_finished(self, params):
660         guid = int(params[1])
661         status = self._controller.is_finished(guid)
662         result = base64.b64encode(str(status))
663         return "%d|%s" % (OK, result)
664
665     def start(self, params):
666         self._controller.start()
667         return "%d|%s" % (OK, "")
668
669     def stop(self, params):
670         self._controller.stop()
671         return "%d|%s" % (OK, "")
672
673     def recover(self, params):
674         self._controller.recover()
675         return "%d|%s" % (OK, "")
676
677     def shutdown(self, params):
678         self._controller.shutdown()
679         return "%d|%s" % (OK, "")
680
681 class TestbedInstanceProxy(object):
682     def __init__(self, root_dir, log_level, testbed_id = None, 
683             testbed_version = None, launch = True, host = None, 
684             port = None, user = None, agent = None):
685         if launch:
686             if testbed_id == None or testbed_version == None:
687                 raise RuntimeError("To launch a TesbedInstance server a \
688                         testbed_id and testbed_version are required")
689             # ssh
690             if host != None:
691                 python_code = "from nepi.util.proxy import \
692                         TesbedInstanceServer;\
693                         s = TestbedInstanceServer('%s', %d, '%s', '%s');\
694                         s.run()" % (root_dir, log_level, testbed_id, 
695                                 testbed_version)
696                 self._client = launch_ssh_daemon_client(root_dir, python_code,
697                         host, port, user, agent)
698             else:
699                 # launch daemon
700                 s = TestbedInstanceServer(root_dir, log_level, testbed_id, 
701                     testbed_version)
702                 s.run()
703                 # create client
704                 self._client = server.Client(root_dir)
705         else:
706             # attempt to reconnect
707             if host != None:
708                 self._client = launch_ssh_daemon_client(root_dir, None,
709                         host, port, user, agent)
710             else:
711                 self._client = server.Client(root_dir)
712
713     @property
714     def guids(self):
715         msg = testbed_messages[GUIDS]
716         self._client.send_msg(msg)
717         reply = self._client.read_reply()
718         result = reply.split("|")
719         code = int(result[0])
720         text = base64.b64decode(result[1])
721         if code == ERROR:
722             raise RuntimeError(text)
723         return map(int, text.split(","))
724
725     def defer_configure(self, name, value):
726         msg = testbed_messages[CONFIGURE]
727         type = get_type(value)
728         # avoid having "|" in this parameters
729         name = base64.b64encode(name)
730         value = base64.b64encode(str(value))
731         msg = msg % (name, value, type)
732         self._client.send_msg(msg)
733         reply = self._client.read_reply()
734         result = reply.split("|")
735         code = int(result[0])
736         text = base64.b64decode(result[1])
737         if code == ERROR:
738             raise RuntimeError(text)
739
740     def defer_create(self, guid, factory_id):
741         msg = testbed_messages[CREATE]
742         msg = msg % (guid, factory_id)
743         self._client.send_msg(msg)
744         reply = self._client.read_reply()
745         result = reply.split("|")
746         code = int(result[0])
747         text = base64.b64decode(result[1])
748         if code == ERROR:
749             raise RuntimeError(text)
750
751     def defer_create_set(self, guid, name, value):
752         msg = testbed_messages[CREATE_SET]
753         type = get_type(value)
754         # avoid having "|" in this parameters
755         name = base64.b64encode(name)
756         value = base64.b64encode(str(value))
757         msg = msg % (guid, name, value, type)
758         self._client.send_msg(msg)
759         reply = self._client.read_reply()
760         result = reply.split("|")
761         code = int(result[0])
762         text = base64.b64decode(result[1])
763         if code == ERROR:
764             raise RuntimeError(text)
765
766     def defer_factory_set(self, guid, name, value):
767         msg = testbed_messages[FACTORY_SET]
768         type = get_type(value)
769         # avoid having "|" in this parameters
770         name = base64.b64encode(name)
771         value = base64.b64encode(str(value))
772         msg = msg % (guid, name, value, type)
773         self._client.send_msg(msg)
774         reply = self._client.read_reply()
775         result = reply.split("|")
776         code = int(result[0])
777         text = base64.b64decode(result[1])
778         if code == ERROR:
779             raise RuntimeError(text)
780
781     def defer_connect(self, guid1, connector_type_name1, guid2, 
782             connector_type_name2): 
783         msg = testbed_messages[CONNECT]
784         msg = msg % (guid1, connector_type_name1, guid2, 
785             connector_type_name2)
786         self._client.send_msg(msg)
787         reply = self._client.read_reply()
788         result = reply.split("|")
789         code = int(result[0])
790         text = base64.b64decode(result[1])
791         if code == ERROR:
792             raise RuntimeError(text)
793
794     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
795             cross_testbed_id, cross_factory_id, cross_connector_type_name):
796         msg = testbed_messages[CROSS_CONNECT]
797         msg = msg % (guid, connector_type_name, cross_guid, 
798             cross_testbed_id, cross_factory_id, cross_connector_type_name)
799         self._client.send_msg(msg)
800         reply = self._client.read_reply()
801         result = reply.split("|")
802         code = int(result[0])
803         text = base64.b64decode(result[1])
804         if code == ERROR:
805             raise RuntimeError(text)
806
807     def defer_add_trace(self, guid, trace_id):
808         msg = testbed_messages[ADD_TRACE]
809         msg = msg % (guid, trace_id)
810         self._client.send_msg(msg)
811         reply = self._client.read_reply()
812         result = reply.split("|")
813         code = int(result[0])
814         text = base64.b64decode(result[1])
815         if code == ERROR:
816             raise RuntimeError(text)
817
818     def defer_add_address(self, guid, address, netprefix, broadcast): 
819         msg = testbed_messages[ADD_ADDRESS]
820         msg = msg % (guid, address, netprefix, broadcast)
821         self._client.send_msg(msg)
822         reply = self._client.read_reply()
823         result = reply.split("|")
824         code = int(result[0])
825         text = base64.b64decode(result[1])
826         if code == ERROR:
827             raise RuntimeError(text)
828
829     def defer_add_route(self, guid, destination, netprefix, nexthop):
830         msg = testbed_messages[ADD_ROUTE]
831         msg = msg % (guid, destination, netprefix, nexthop)
832         self._client.send_msg(msg)
833         reply = self._client.read_reply()
834         result = reply.split("|")
835         code = int(result[0])
836         text = base64.b64decode(result[1])
837         if code == ERROR:
838             raise RuntimeError(text)
839
840     def do_setup(self):
841         msg = testbed_messages[DO_SETUP]
842         self._client.send_msg(msg)
843         reply = self._client.read_reply()
844         result = reply.split("|")
845         code = int(result[0])
846         text = base64.b64decode(result[1])
847         if code == ERROR:
848             raise RuntimeError(text)
849
850     def do_create(self):
851         msg = testbed_messages[DO_CREATE]
852         self._client.send_msg(msg)
853         reply = self._client.read_reply()
854         result = reply.split("|")
855         code = int(result[0])
856         text = base64.b64decode(result[1])
857         if code == ERROR:
858             raise RuntimeError(text)
859
860     def do_connect(self):
861         msg = testbed_messages[DO_CONNECT]
862         self._client.send_msg(msg)
863         reply = self._client.read_reply()
864         result = reply.split("|")
865         code = int(result[0])
866         text = base64.b64decode(result[1])
867         if code == ERROR:
868             raise RuntimeError(text)
869
870     def do_configure(self):
871         msg = testbed_messages[DO_CONFIGURE]
872         self._client.send_msg(msg)
873         reply = self._client.read_reply()
874         result = reply.split("|")
875         code = int(result[0])
876         text = base64.b64decode(result[1])
877         if code == ERROR:
878             raise RuntimeError(text)
879
880     def do_cross_connect(self):
881         msg = testbed_messages[DO_CROSS_CONNECT]
882         self._client.send_msg(msg)
883         reply = self._client.read_reply()
884         result = reply.split("|")
885         code = int(result[0])
886         text = base64.b64decode(result[1])
887         if code == ERROR:
888             raise RuntimeError(text)
889
890     def start(self, time = TIME_NOW):
891         msg = testbed_messages[START]
892         self._client.send_msg(msg)
893         reply = self._client.read_reply()
894         result = reply.split("|")
895         code = int(result[0])
896         text = base64.b64decode(result[1])
897         if code == ERROR:
898             raise RuntimeError(text)
899
900     def stop(self, time = TIME_NOW):
901         msg = testbed_messages[STOP]
902         self._client.send_msg(msg)
903         reply = self._client.read_reply()
904         result = reply.split("|")
905         code = int(result[0])
906         text = base64.b64decode(result[1])
907         if code == ERROR:
908             raise RuntimeError(text)
909
910     def set(self, time, guid, name, value):
911         msg = testbed_messages[SET]
912         type = get_type(value)
913         # avoid having "|" in this parameters
914         name = base64.b64encode(name)
915         value = base64.b64encode(str(value))
916         msg = msg % (time, guid, name, value, type)
917         self._client.send_msg(msg)
918         reply = self._client.read_reply()
919         result = reply.split("|")
920         code = int(result[0])
921         text = base64.b64decode(result[1])
922         if code == ERROR:
923             raise RuntimeError(text)
924
925     def get(self, time, guid, name):
926         msg = testbed_messages[GET]
927         # avoid having "|" in this parameters
928         name = base64.b64encode(name)
929         msg = msg % (time, guid, name)
930         self._client.send_msg(msg)
931         reply = self._client.read_reply()
932         result = reply.split("|")
933         code = int(result[0])
934         text = base64.b64decode(result[1])
935         if code == ERROR:
936             raise RuntimeError(text)
937         return text
938
939     def get_address(self, guid, index, attribute):
940         msg = testbed_messages[GET_ADDRESS]
941         # avoid having "|" in this parameters
942         attribute = base64.b64encode(attribute)
943         msg = msg % (guid, index, attribute)
944         self._client.send_msg(msg)
945         reply = self._client.read_reply()
946         result = reply.split("|")
947         code = int(result[0])
948         text = base64.b64decode(result[1])
949         if code == ERROR:
950             raise RuntimeError(text)
951         return text
952
953     def get_route(self, guid, index, attribute):
954         msg = testbed_messages[GET_ROUTE]
955         # avoid having "|" in this parameters
956         attribute = base64.b64encode(attribute)
957         msg = msg % (guid, index, attribute)
958         self._client.send_msg(msg)
959         reply = self._client.read_reply()
960         result = reply.split("|")
961         code = int(result[0])
962         text = base64.b64decode(result[1])
963         if code == ERROR:
964             raise RuntimeError(text)
965         return text
966
967     def action(self, time, guid, action):
968         msg = testbed_messages[ACTION]
969         msg = msg % (time, guid, action)
970         self._client.send_msg(msg)
971         reply = self._client.read_reply()
972         result = reply.split("|")
973         code = int(result[0])
974         text = base64.b64decode(result[1])
975         if code == ERROR:
976             raise RuntimeError(text)
977
978     def status(self, guid):
979         msg = testbed_messages[STATUS]
980         msg = msg % (guid)
981         self._client.send_msg(msg)
982         reply = self._client.read_reply()
983         result = reply.split("|")
984         code = int(result[0])
985         text = base64.b64decode(result[1])
986         if code == ERROR:
987             raise RuntimeError(text)
988         return int(text)
989
990     def trace(self, guid, trace_id, attribute='value'):
991         msg = testbed_messages[TRACE]
992         attribute = base64.b64encode(attribute)
993         msg = msg % (guid, trace_id, attribute)
994         self._client.send_msg(msg)
995         reply = self._client.read_reply()
996         result = reply.split("|")
997         code = int(result[0])
998         text = base64.b64decode(result[1])
999         if code == ERROR:
1000             raise RuntimeError(text)
1001         return text
1002
1003     def shutdown(self):
1004         msg = testbed_messages[SHUTDOWN]
1005         self._client.send_msg(msg)
1006         reply = self._client.read_reply()
1007         result = reply.split("|")
1008         code = int(result[0])
1009         text = base64.b64decode(result[1])
1010         if code == ERROR:
1011             raise RuntimeError(text)
1012         self._client.send_stop()
1013         self._client.read_reply() # wait for it
1014
1015 class ExperimentControllerProxy(object):
1016     def __init__(self, root_dir, log_level, experiment_xml = None, 
1017             launch = True, host = None, port = None, user = None, 
1018             agent = None):
1019         if launch:
1020             # launch server
1021             if experiment_xml == None:
1022                 raise RuntimeError("To launch a ExperimentControllerServer a \
1023                         xml description of the experiment is required")
1024             # ssh
1025             if host != None:
1026                 xml = experiment_xml
1027                 xml = xml.replace("'", r"\'")
1028                 xml = xml.replace("\"", r"\'")
1029                 xml = xml.replace("\n", r"")
1030                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1031                         s = ExperimentControllerServer(%r, %r, %r);\
1032                         s.run()" % (root_dir, log_level, xml)
1033                 self._client = launch_ssh_daemon_client(root_dir, python_code,
1034                         host, port, user, agent)
1035             else:
1036                 # launch daemon
1037                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1038                 s.run()
1039                 # create client
1040                 self._client = server.Client(root_dir)
1041         else:
1042             # attempt to reconnect
1043             if host != None:
1044                 self._client = launch_ssh_daemon_client(root_dir, None,
1045                         host, port, user, agent)
1046             else:
1047                 self._client = server.Client(root_dir)
1048
1049     @property
1050     def experiment_xml(self):
1051         msg = controller_messages[XML]
1052         self._client.send_msg(msg)
1053         reply = self._client.read_reply()
1054         result = reply.split("|")
1055         code = int(result[0])
1056         text = base64.b64decode(result[1])
1057         if code == ERROR:
1058             raise RuntimeError(text)
1059         return text
1060
1061     def set_access_configuration(self, testbed_guid, access_config):
1062         mode = access_config.get_attribute_value("mode")
1063         communication = access_config.get_attribute_value("communication")
1064         host = access_config.get_attribute_value("host")
1065         user = access_config.get_attribute_value("user")
1066         port = access_config.get_attribute_value("port")
1067         root_dir = access_config.get_attribute_value("rootDirectory")
1068         use_agent = access_config.get_attribute_value("useAgent")
1069         log_level = access_config.get_attribute_value("logLevel")
1070         msg = controller_messages[ACCESS]
1071         msg = msg % (testbed_guid, mode, communication, host, user, port, 
1072                 root_dir, use_agent, log_level)
1073         self._client.send_msg(msg)
1074         reply = self._client.read_reply()
1075         result = reply.split("|")
1076         code = int(result[0])
1077         text =  base64.b64decode(result[1])
1078         if code == ERROR:
1079             raise RuntimeError(text)
1080
1081     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1082         msg = controller_messages[TRACE]
1083         attribute = base64.b64encode(attribute)
1084         msg = msg % (testbed_guid, guid, trace_id, attribute)
1085         self._client.send_msg(msg)
1086         reply = self._client.read_reply()
1087         result = reply.split("|")
1088         code = int(result[0])
1089         text =  base64.b64decode(result[1])
1090         if code == OK:
1091             return text
1092         raise RuntimeError(text)
1093
1094     def start(self):
1095         msg = controller_messages[START]
1096         self._client.send_msg(msg)
1097         reply = self._client.read_reply()
1098         result = reply.split("|")
1099         code = int(result[0])
1100         text =  base64.b64decode(result[1])
1101         if code == ERROR:
1102             raise RuntimeError(text)
1103
1104     def stop(self):
1105         msg = controller_messages[STOP]
1106         self._client.send_msg(msg)
1107         reply = self._client.read_reply()
1108         result = reply.split("|")
1109         code = int(result[0])
1110         text =  base64.b64decode(result[1])
1111         if code == ERROR:
1112             raise RuntimeError(text)
1113
1114     def recover(self):
1115         msg = controller_messages[RECOVER]
1116         self._client.send_msg(msg)
1117         reply = self._client.read_reply()
1118         result = reply.split("|")
1119         code = int(result[0])
1120         text =  base64.b64decode(result[1])
1121         if code == ERROR:
1122             raise RuntimeError(text)
1123
1124     def is_finished(self, guid):
1125         msg = controller_messages[FINISHED]
1126         msg = msg % guid
1127         self._client.send_msg(msg)
1128         reply = self._client.read_reply()
1129         result = reply.split("|")
1130         code = int(result[0])
1131         text = base64.b64decode(result[1])
1132         if code == ERROR:
1133             raise RuntimeError(text)
1134         return text == "True"
1135
1136     def shutdown(self):
1137         msg = controller_messages[SHUTDOWN]
1138         self._client.send_msg(msg)
1139         reply = self._client.read_reply()
1140         result = reply.split("|")
1141         code = int(result[0])
1142         text =  base64.b64decode(result[1])
1143         if code == ERROR:
1144             raise RuntimeError(text)
1145         self._client.send_stop()
1146         self._client.read_reply() # wait for it
1147