db301ad1b70c12d6913dfa0289b18e66bee09c1f
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import sys
10 import time
11 import tempfile
12 import shutil
13
14 # PROTOCOL REPLIES
15 OK = 0
16 ERROR = 1
17
18 # PROTOCOL INSTRUCTION MESSAGES
19 XML = 2 
20 ACCESS  = 3
21 TRACE   = 4
22 FINISHED    = 5
23 START   = 6
24 STOP    = 7
25 SHUTDOWN    = 8
26 CONFIGURE   = 9
27 CREATE      = 10
28 CREATE_SET  = 11
29 FACTORY_SET = 12
30 CONNECT     = 13
31 CROSS_CONNECT   = 14
32 ADD_TRACE   = 15
33 ADD_ADDRESS = 16
34 ADD_ROUTE   = 17
35 DO_SETUP    = 18
36 DO_CREATE   = 19
37 DO_CONNECT  = 20
38 DO_CONFIGURE    = 21
39 DO_CROSS_CONNECT    = 22
40 GET = 23
41 SET = 24
42 ACTION  = 25
43 STATUS  = 26
44 GUIDS  = 27
45 GET_ROUTE = 28
46 GET_ADDRESS = 29
47 RECOVER = 30
48
49 # PARAMETER TYPE
50 STRING  =  100
51 INTEGER = 101
52 BOOL    = 102
53 FLOAT   = 103
54
55 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
56 controller_messages = dict({
57     XML:    "%d" % XML,
58     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
59     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
60     FINISHED:   "%d|%s" % (FINISHED, "%d"),
61     START:  "%d" % START,
62     STOP:   "%d" % STOP,
63     RECOVER : "%d" % RECOVER,
64     SHUTDOWN:   "%d" % SHUTDOWN,
65     })
66
67 # TESTBED INSTANCE PROTOCOL MESSAGES
68 testbed_messages = dict({
69     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
70     START:  "%d" % START,
71     STOP:   "%d" % STOP,
72     SHUTDOWN:   "%d" % SHUTDOWN,
73     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
74     CREATE: "%d|%s" % (CREATE, "%d|%s"),
75     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
76     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
77     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
78     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
79     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
80     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
81     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
82     DO_SETUP:   "%d" % DO_SETUP,
83     DO_CREATE:  "%d" % DO_CREATE,
84     DO_CONNECT: "%d" % DO_CONNECT,
85     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
86     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
87     GET:    "%d|%s" % (GET, "%s|%d|%s"),
88     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
89     GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
90     GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
91     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
92     STATUS: "%d|%s" % (STATUS, "%d"),
93     GUIDS:  "%d" % GUIDS,
94     })
95
96 instruction_text = dict({
97     OK:     "OK",
98     ERROR:  "ERROR",
99     XML:    "XML",
100     ACCESS: "ACCESS",
101     TRACE:  "TRACE",
102     FINISHED:   "FINISHED",
103     START:  "START",
104     STOP:   "STOP",
105     RECOVER: "RECOVER",
106     SHUTDOWN:   "SHUTDOWN",
107     CONFIGURE:  "CONFIGURE",
108     CREATE: "CREATE",
109     CREATE_SET: "CREATE_SET",
110     FACTORY_SET:    "FACTORY_SET",
111     CONNECT:    "CONNECT",
112     CROSS_CONNECT: "CROSS_CONNECT",
113     ADD_TRACE:  "ADD_TRACE",
114     ADD_ADDRESS:    "ADD_ADDRESS",
115     ADD_ROUTE:  "ADD_ROUTE",
116     DO_SETUP:   "DO_SETUP",
117     DO_CREATE:  "DO_CREATE",
118     DO_CONNECT: "DO_CONNECT",
119     DO_CONFIGURE:   "DO_CONFIGURE",
120     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
121     GET:    "GET",
122     SET:    "SET",
123     GET_ROUTE: "GET_ROUTE",
124     GET_ADDRESS: "GET_ADDRESS",
125     ACTION: "ACTION",
126     STATUS: "STATUS",
127     GUIDS:  "GUIDS",
128     STRING: "STRING",
129     INTEGER:    "INTEGER",
130     BOOL:   "BOOL",
131     FLOAT:  "FLOAT"
132     })
133
134 def get_type(value):
135     if isinstance(value, bool):
136         return BOOL
137     elif isinstance(value, int):
138         return INTEGER
139     elif isinstance(value, float):
140         return FLOAT
141     else:
142         return STRING
143
144 def set_type(type, value):
145     if type == INTEGER:
146         value = int(value)
147     elif type == FLOAT:
148         value = float(value)
149     elif type == BOOL:
150         value = value == "True"
151     else:
152         value = str(value)
153     return value
154
155 def log_msg(server, params):
156     instr = int(params[0])
157     instr_txt = instruction_text[instr]
158     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
159         instr_txt, ", ".join(map(str, params[1:]))))
160
161 def log_reply(server, reply):
162     res = reply.split("|")
163     code = int(res[0])
164     code_txt = instruction_text[code]
165     txt = base64.b64decode(res[1])
166     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
167             code_txt, txt))
168
169 def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
170     if python_code:
171         # launch daemon
172         proc = server.popen_ssh_subprocess(python_code, host = host,
173             port = port, user = user, agent = agent)
174         if proc.poll():
175             err = proc.stderr.read()
176             raise RuntimeError("Client could not be executed: %s" % \
177                     err)
178     # create client
179     return server.Client(root_dir, host = host, port = port, user = user, 
180             agent = agent)
181
182 def to_server_log_level(log_level):
183     return server.DEBUG_LEVEL \
184             if log_level == AccessConfiguration.DEBUG_LEVEL \
185                 else server.ERROR_LEVEL
186
187 def get_access_config_params(access_config):
188     root_dir = access_config.get_attribute_value("rootDirectory")
189     log_level = access_config.get_attribute_value("logLevel")
190     log_level = to_server_log_level(log_level)
191     user = host = port = agent = None
192     communication = access_config.get_attribute_value("communication")
193     if communication == AccessConfiguration.ACCESS_SSH:
194         user = access_config.get_attribute_value("user")
195         host = access_config.get_attribute_value("host")
196         port = access_config.get_attribute_value("port")
197         agent = access_config.get_attribute_value("useAgent")
198     return (root_dir, log_level, user, host, port, agent)
199
200 class AccessConfiguration(AttributesMap):
201     MODE_SINGLE_PROCESS = "SINGLE"
202     MODE_DAEMON = "DAEMON"
203     ACCESS_SSH = "SSH"
204     ACCESS_LOCAL = "LOCAL"
205     ERROR_LEVEL = "Error"
206     DEBUG_LEVEL = "Debug"
207
208     def __init__(self):
209         super(AccessConfiguration, self).__init__()
210         self.add_attribute(name = "mode",
211                 help = "Instance execution mode",
212                 type = Attribute.ENUM,
213                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
214                 allowed = [AccessConfiguration.MODE_DAEMON,
215                     AccessConfiguration.MODE_SINGLE_PROCESS],
216                 validation_function = validation.is_enum)
217         self.add_attribute(name = "communication",
218                 help = "Instance communication mode",
219                 type = Attribute.ENUM,
220                 value = AccessConfiguration.ACCESS_LOCAL,
221                 allowed = [AccessConfiguration.ACCESS_LOCAL,
222                     AccessConfiguration.ACCESS_SSH],
223                 validation_function = validation.is_enum)
224         self.add_attribute(name = "host",
225                 help = "Host where the testbed will be executed",
226                 type = Attribute.STRING,
227                 value = "localhost",
228                 validation_function = validation.is_string)
229         self.add_attribute(name = "user",
230                 help = "User on the Host to execute the testbed",
231                 type = Attribute.STRING,
232                 value = getpass.getuser(),
233                 validation_function = validation.is_string)
234         self.add_attribute(name = "port",
235                 help = "Port on the Host",
236                 type = Attribute.INTEGER,
237                 value = 22,
238                 validation_function = validation.is_integer)
239         self.add_attribute(name = "rootDirectory",
240                 help = "Root directory for storing process files",
241                 type = Attribute.STRING,
242                 value = ".",
243                 validation_function = validation.is_string) # TODO: validation.is_path
244         self.add_attribute(name = "useAgent",
245                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
246                 type = Attribute.BOOL,
247                 value = False,
248                 validation_function = validation.is_bool)
249         self.add_attribute(name = "logLevel",
250                 help = "Log level for instance",
251                 type = Attribute.ENUM,
252                 value = AccessConfiguration.ERROR_LEVEL,
253                 allowed = [AccessConfiguration.ERROR_LEVEL,
254                     AccessConfiguration.DEBUG_LEVEL],
255                 validation_function = validation.is_enum)
256         self.add_attribute(name = "recover",
257                 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
258                 type = Attribute.BOOL,
259                 value = False,
260                 validation_function = validation.is_bool)
261
262 class TempDir(object):
263     def __init__(self):
264         self.path = tempfile.mkdtemp()
265     
266     def __del__(self):
267         shutil.rmtree(self.path)
268
269 class PermDir(object):
270     def __init__(self, path):
271         self.path = path
272
273 def create_controller(xml, access_config = None):
274     mode = None if not access_config \
275             else access_config.get_attribute_value("mode")
276     launch = True if not access_config \
277             else not access_config.get_attribute_value("recover")
278     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
279         if not launch:
280             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
281         
282         from nepi.core.execute import ExperimentController
283         
284         if not access_config or not access_config.has_attribute("rootDirectory"):
285             root_dir = TempDir()
286         else:
287             root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
288         controller = ExperimentController(xml, root_dir.path)
289         
290         # inject reference to temporary dir, so that it gets cleaned
291         # up at destruction time.
292         controller._tempdir = root_dir
293         
294         return controller
295     elif mode == AccessConfiguration.MODE_DAEMON:
296         (root_dir, log_level, user, host, port, agent) = \
297                 get_access_config_params(access_config)
298         return ExperimentControllerProxy(root_dir, log_level,
299                 experiment_xml = xml, host = host, port = port, user = user, 
300                 agent = agent, launch = launch)
301     raise RuntimeError("Unsupported access configuration '%s'" % mode)
302
303 def create_testbed_instance(testbed_id, testbed_version, access_config):
304     mode = None if not access_config \
305             else access_config.get_attribute_value("mode")
306     launch = True if not access_config \
307             else not access_config.get_attribute_value("recover")
308     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
309         if not launch:
310             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
311         return  _build_testbed_instance(testbed_id, testbed_version)
312     elif mode == AccessConfiguration.MODE_DAEMON:
313         (root_dir, log_level, user, host, port, agent) = \
314                 get_access_config_params(access_config)
315         return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id, 
316                 testbed_version = testbed_version, host = host, port = port,
317                 user = user, agent = agent, launch = launch)
318     raise RuntimeError("Unsupported access configuration '%s'" % mode)
319
320 def _build_testbed_instance(testbed_id, testbed_version):
321     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
322     if not mod_name in sys.modules:
323         __import__(mod_name)
324     module = sys.modules[mod_name]
325     return module.TestbedInstance(testbed_version)
326
327 class TestbedInstanceServer(server.Server):
328     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
329         super(TestbedInstanceServer, self).__init__(root_dir, log_level)
330         self._testbed_id = testbed_id
331         self._testbed_version = testbed_version
332         self._testbed = None
333
334     def post_daemonize(self):
335         self._testbed = _build_testbed_instance(self._testbed_id, 
336                 self._testbed_version)
337
338     def reply_action(self, msg):
339         if not msg:
340             result = base64.b64encode("Invalid command line")
341             reply = "%d|%s" % (ERROR, result)
342         else:
343             params = msg.split("|")
344             instruction = int(params[0])
345             log_msg(self, params)
346             try:
347                 if instruction == TRACE:
348                     reply = self.trace(params)
349                 elif instruction == START:
350                     reply = self.start(params)
351                 elif instruction == STOP:
352                     reply = self.stop(params)
353                 elif instruction == SHUTDOWN:
354                     reply = self.shutdown(params)
355                 elif instruction == CONFIGURE:
356                     reply = self.defer_configure(params)
357                 elif instruction == CREATE:
358                     reply = self.defer_create(params)
359                 elif instruction == CREATE_SET:
360                     reply = self.defer_create_set(params)
361                 elif instruction == FACTORY_SET:
362                     reply = self.defer_factory_set(params)
363                 elif instruction == CONNECT:
364                     reply = self.defer_connect(params)
365                 elif instruction == CROSS_CONNECT:
366                     reply = self.defer_cross_connect(params)
367                 elif instruction == ADD_TRACE:
368                     reply = self.defer_add_trace(params)
369                 elif instruction == ADD_ADDRESS:
370                     reply = self.defer_add_address(params)
371                 elif instruction == ADD_ROUTE:
372                     reply = self.defer_add_route(params)
373                 elif instruction == DO_SETUP:
374                     reply = self.do_setup(params)
375                 elif instruction == DO_CREATE:
376                     reply = self.do_create(params)
377                 elif instruction == DO_CONNECT:
378                     reply = self.do_connect(params)
379                 elif instruction == DO_CONFIGURE:
380                     reply = self.do_configure(params)
381                 elif instruction == DO_CROSS_CONNECT:
382                     reply = self.do_cross_connect(params)
383                 elif instruction == GET:
384                     reply = self.get(params)
385                 elif instruction == SET:
386                     reply = self.set(params)
387                 elif instruction == GET_ADDRESS:
388                     reply = self.get_address(params)
389                 elif instruction == GET_ROUTE:
390                     reply = self.get_route(params)
391                 elif instruction == ACTION:
392                     reply = self.action(params)
393                 elif instruction == STATUS:
394                     reply = self.status(params)
395                 elif instruction == GUIDS:
396                     reply = self.guids(params)
397                 else:
398                     error = "Invalid instruction %s" % instruction
399                     self.log_error(error)
400                     result = base64.b64encode(error)
401                     reply = "%d|%s" % (ERROR, result)
402             except:
403                 error = self.log_error()
404                 result = base64.b64encode(error)
405                 reply = "%d|%s" % (ERROR, result)
406         log_reply(self, reply)
407         return reply
408
409     def guids(self, params):
410         guids = self._testbed.guids
411         guids = ",".join(map(str, guids))
412         result = base64.b64encode(guids)
413         return "%d|%s" % (OK, result)
414
415     def defer_create(self, params):
416         guid = int(params[1])
417         factory_id = params[2]
418         self._testbed.defer_create(guid, factory_id)
419         return "%d|%s" % (OK, "")
420
421     def trace(self, params):
422         guid = int(params[1])
423         trace_id = params[2]
424         attribute = base64.b64decode(params[3])
425         trace = self._testbed.trace(guid, trace_id, attribute)
426         result = base64.b64encode(trace)
427         return "%d|%s" % (OK, result)
428
429     def start(self, params):
430         self._testbed.start()
431         return "%d|%s" % (OK, "")
432
433     def stop(self, params):
434         self._testbed.stop()
435         return "%d|%s" % (OK, "")
436
437     def shutdown(self, params):
438         self._testbed.shutdown()
439         return "%d|%s" % (OK, "")
440
441     def defer_configure(self, params):
442         name = base64.b64decode(params[1])
443         value = base64.b64decode(params[2])
444         type = int(params[3])
445         value = set_type(type, value)
446         self._testbed.defer_configure(name, value)
447         return "%d|%s" % (OK, "")
448
449     def defer_create_set(self, params):
450         guid = int(params[1])
451         name = base64.b64decode(params[2])
452         value = base64.b64decode(params[3])
453         type = int(params[4])
454         value = set_type(type, value)
455         self._testbed.defer_create_set(guid, name, value)
456         return "%d|%s" % (OK, "")
457
458     def defer_factory_set(self, params):
459         name = base64.b64decode(params[1])
460         value = base64.b64decode(params[2])
461         type = int(params[3])
462         value = set_type(type, value)
463         self._testbed.defer_factory_set(name, value)
464         return "%d|%s" % (OK, "")
465
466     def defer_connect(self, params):
467         guid1 = int(params[1])
468         connector_type_name1 = params[2]
469         guid2 = int(params[3])
470         connector_type_name2 = params[4]
471         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
472             connector_type_name2)
473         return "%d|%s" % (OK, "")
474
475     def defer_cross_connect(self, params):
476         guid = int(params[1])
477         connector_type_name = params[2]
478         cross_guid = int(params[3])
479         connector_type_name = params[4]
480         cross_guid = int(params[5])
481         cross_testbed_id = params[6]
482         cross_factory_id = params[7]
483         cross_connector_type_name = params[8]
484         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
485             cross_testbed_id, cross_factory_id, cross_connector_type_name)
486         return "%d|%s" % (OK, "")
487
488     def defer_add_trace(self, params):
489         guid = int(params[1])
490         trace_id = params[2]
491         self._testbed.defer_add_trace(guid, trace_id)
492         return "%d|%s" % (OK, "")
493
494     def defer_add_address(self, params):
495         guid = int(params[1])
496         address = params[2]
497         netprefix = int(params[3])
498         broadcast = params[4]
499         self._testbed.defer_add_address(guid, address, netprefix,
500                 broadcast)
501         return "%d|%s" % (OK, "")
502
503     def defer_add_route(self, params):
504         guid = int(params[1])
505         destination = params[2]
506         netprefix = int(params[3])
507         nexthop = params[4]
508         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
509         return "%d|%s" % (OK, "")
510
511     def do_setup(self, params):
512         self._testbed.do_setup()
513         return "%d|%s" % (OK, "")
514
515     def do_create(self, params):
516         self._testbed.do_create()
517         return "%d|%s" % (OK, "")
518
519     def do_connect(self, params):
520         self._testbed.do_connect()
521         return "%d|%s" % (OK, "")
522
523     def do_configure(self, params):
524         self._testbed.do_configure()
525         return "%d|%s" % (OK, "")
526
527     def do_cross_connect(self, params):
528         self._testbed.do_cross_connect()
529         return "%d|%s" % (OK, "")
530
531     def get(self, params):
532         time = params[1]
533         guid = int(param[2] )
534         name = base64.b64decode(params[3])
535         value = self._testbed.get(time, guid, name)
536         result = base64.b64encode(str(value))
537         return "%d|%s" % (OK, result)
538
539     def set(self, params):
540         time = params[1]
541         guid = int(params[2])
542         name = base64.b64decode(params[3])
543         value = base64.b64decode(params[4])
544         type = int(params[3])
545         value = set_type(type, value)
546         self._testbed.set(time, guid, name, value)
547         return "%d|%s" % (OK, "")
548
549     def get_address(self, params):
550         guid = int(param[1])
551         index = int(param[2])
552         attribute = base64.b64decode(param[3])
553         value = self._testbed.get_address(guid, index, attribute)
554         result = base64.b64encode(str(value))
555         return "%d|%s" % (OK, result)
556
557     def get_route(self, params):
558         guid = int(param[1])
559         index = int(param[2])
560         attribute = base64.b64decode(param[3])
561         value = self._testbed.get_route(guid, index, attribute)
562         result = base64.b64encode(str(value))
563         return "%d|%s" % (OK, result)
564
565     def action(self, params):
566         time = params[1]
567         guid = int(params[2])
568         command = base64.b64decode(params[3])
569         self._testbed.action(time, guid, command)
570         return "%d|%s" % (OK, "")
571
572     def status(self, params):
573         guid = int(params[1])
574         status = self._testbed.status(guid)
575         result = base64.b64encode(str(status))
576         return "%d|%s" % (OK, result)
577  
578 class ExperimentControllerServer(server.Server):
579     def __init__(self, root_dir, log_level, experiment_xml):
580         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
581         self._experiment_xml = experiment_xml
582         self._controller = None
583
584     def post_daemonize(self):
585         from nepi.core.execute import ExperimentController
586         self._controller = ExperimentController(self._experiment_xml, 
587             root_dir = self._root_dir)
588
589     def reply_action(self, msg):
590         if not msg:
591             result = base64.b64encode("Invalid command line")
592             reply = "%d|%s" % (ERROR, result)
593         else:
594             params = msg.split("|")
595             instruction = int(params[0])
596             log_msg(self, params)
597             try:
598                 if instruction == XML:
599                     reply = self.experiment_xml(params)
600                 elif instruction == ACCESS:
601                     reply = self.set_access_configuration(params)
602                 elif instruction == TRACE:
603                     reply = self.trace(params)
604                 elif instruction == FINISHED:
605                     reply = self.is_finished(params)
606                 elif instruction == START:
607                     reply = self.start(params)
608                 elif instruction == STOP:
609                     reply = self.stop(params)
610                 elif instruction == RECOVER:
611                     reply = self.recover(params)
612                 elif instruction == SHUTDOWN:
613                     reply = self.shutdown(params)
614                 else:
615                     error = "Invalid instruction %s" % instruction
616                     self.log_error(error)
617                     result = base64.b64encode(error)
618                     reply = "%d|%s" % (ERROR, result)
619             except:
620                 error = self.log_error()
621                 result = base64.b64encode(error)
622                 reply = "%d|%s" % (ERROR, result)
623         log_reply(self, reply)
624         return reply
625
626     def experiment_xml(self, params):
627         xml = self._controller.experiment_xml
628         result = base64.b64encode(xml)
629         return "%d|%s" % (OK, result)
630
631     def set_access_configuration(self, params):
632         testbed_guid = int(params[1])
633         mode = params[2]
634         communication = params[3]
635         host = params[4]
636         user = params[5]
637         port = int(params[6])
638         root_dir = params[7]
639         use_agent = params[8] == "True"
640         log_level = params[9]
641         access_config = AccessConfiguration()
642         access_config.set_attribute_value("mode", mode)
643         access_config.set_attribute_value("communication", communication)
644         access_config.set_attribute_value("host", host)
645         access_config.set_attribute_value("user", user)
646         access_config.set_attribute_value("port", port)
647         access_config.set_attribute_value("rootDirectory", root_dir)
648         access_config.set_attribute_value("useAgent", use_agent)
649         access_config.set_attribute_value("logLevel", log_level)
650         self._controller.set_access_configuration(testbed_guid, 
651                 access_config)
652         return "%d|%s" % (OK, "")
653
654     def trace(self, params):
655         testbed_guid = int(params[1])
656         guid = int(params[2])
657         trace_id = params[3]
658         attribute = base64.b64decode(params[4])
659         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
660         result = base64.b64encode(trace)
661         return "%d|%s" % (OK, result)
662
663     def is_finished(self, params):
664         guid = int(params[1])
665         status = self._controller.is_finished(guid)
666         result = base64.b64encode(str(status))
667         return "%d|%s" % (OK, result)
668
669     def start(self, params):
670         self._controller.start()
671         return "%d|%s" % (OK, "")
672
673     def stop(self, params):
674         self._controller.stop()
675         return "%d|%s" % (OK, "")
676
677     def recover(self, params):
678         self._controller.recover()
679         return "%d|%s" % (OK, "")
680
681     def shutdown(self, params):
682         self._controller.shutdown()
683         return "%d|%s" % (OK, "")
684
685 class TestbedInstanceProxy(object):
686     def __init__(self, root_dir, log_level, testbed_id = None, 
687             testbed_version = None, launch = True, host = None, 
688             port = None, user = None, agent = None):
689         if launch:
690             if testbed_id == None or testbed_version == None:
691                 raise RuntimeError("To launch a TesbedInstance server a \
692                         testbed_id and testbed_version are required")
693             # ssh
694             if host != None:
695                 python_code = "from nepi.util.proxy import \
696                         TesbedInstanceServer;\
697                         s = TestbedInstanceServer('%s', %d, '%s', '%s');\
698                         s.run()" % (root_dir, log_level, testbed_id, 
699                                 testbed_version)
700                 self._client = launch_ssh_daemon_client(root_dir, python_code,
701                         host, port, user, agent)
702             else:
703                 # launch daemon
704                 s = TestbedInstanceServer(root_dir, log_level, testbed_id, 
705                     testbed_version)
706                 s.run()
707                 # create client
708                 self._client = server.Client(root_dir)
709         else:
710             # attempt to reconnect
711             if host != None:
712                 self._client = launch_ssh_daemon_client(root_dir, None,
713                         host, port, user, agent)
714             else:
715                 self._client = server.Client(root_dir)
716
717     @property
718     def guids(self):
719         msg = testbed_messages[GUIDS]
720         self._client.send_msg(msg)
721         reply = self._client.read_reply()
722         result = reply.split("|")
723         code = int(result[0])
724         text = base64.b64decode(result[1])
725         if code == ERROR:
726             raise RuntimeError(text)
727         return map(int, text.split(","))
728
729     def defer_configure(self, name, value):
730         msg = testbed_messages[CONFIGURE]
731         type = get_type(value)
732         # avoid having "|" in this parameters
733         name = base64.b64encode(name)
734         value = base64.b64encode(str(value))
735         msg = msg % (name, value, type)
736         self._client.send_msg(msg)
737         reply = self._client.read_reply()
738         result = reply.split("|")
739         code = int(result[0])
740         text = base64.b64decode(result[1])
741         if code == ERROR:
742             raise RuntimeError(text)
743
744     def defer_create(self, guid, factory_id):
745         msg = testbed_messages[CREATE]
746         msg = msg % (guid, factory_id)
747         self._client.send_msg(msg)
748         reply = self._client.read_reply()
749         result = reply.split("|")
750         code = int(result[0])
751         text = base64.b64decode(result[1])
752         if code == ERROR:
753             raise RuntimeError(text)
754
755     def defer_create_set(self, guid, name, value):
756         msg = testbed_messages[CREATE_SET]
757         type = get_type(value)
758         # avoid having "|" in this parameters
759         name = base64.b64encode(name)
760         value = base64.b64encode(str(value))
761         msg = msg % (guid, name, value, type)
762         self._client.send_msg(msg)
763         reply = self._client.read_reply()
764         result = reply.split("|")
765         code = int(result[0])
766         text = base64.b64decode(result[1])
767         if code == ERROR:
768             raise RuntimeError(text)
769
770     def defer_factory_set(self, guid, name, value):
771         msg = testbed_messages[FACTORY_SET]
772         type = get_type(value)
773         # avoid having "|" in this parameters
774         name = base64.b64encode(name)
775         value = base64.b64encode(str(value))
776         msg = msg % (guid, name, value, type)
777         self._client.send_msg(msg)
778         reply = self._client.read_reply()
779         result = reply.split("|")
780         code = int(result[0])
781         text = base64.b64decode(result[1])
782         if code == ERROR:
783             raise RuntimeError(text)
784
785     def defer_connect(self, guid1, connector_type_name1, guid2, 
786             connector_type_name2): 
787         msg = testbed_messages[CONNECT]
788         msg = msg % (guid1, connector_type_name1, guid2, 
789             connector_type_name2)
790         self._client.send_msg(msg)
791         reply = self._client.read_reply()
792         result = reply.split("|")
793         code = int(result[0])
794         text = base64.b64decode(result[1])
795         if code == ERROR:
796             raise RuntimeError(text)
797
798     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
799             cross_testbed_id, cross_factory_id, cross_connector_type_name):
800         msg = testbed_messages[CROSS_CONNECT]
801         msg = msg % (guid, connector_type_name, cross_guid, 
802             cross_testbed_id, cross_factory_id, cross_connector_type_name)
803         self._client.send_msg(msg)
804         reply = self._client.read_reply()
805         result = reply.split("|")
806         code = int(result[0])
807         text = base64.b64decode(result[1])
808         if code == ERROR:
809             raise RuntimeError(text)
810
811     def defer_add_trace(self, guid, trace_id):
812         msg = testbed_messages[ADD_TRACE]
813         msg = msg % (guid, trace_id)
814         self._client.send_msg(msg)
815         reply = self._client.read_reply()
816         result = reply.split("|")
817         code = int(result[0])
818         text = base64.b64decode(result[1])
819         if code == ERROR:
820             raise RuntimeError(text)
821
822     def defer_add_address(self, guid, address, netprefix, broadcast): 
823         msg = testbed_messages[ADD_ADDRESS]
824         msg = msg % (guid, address, netprefix, broadcast)
825         self._client.send_msg(msg)
826         reply = self._client.read_reply()
827         result = reply.split("|")
828         code = int(result[0])
829         text = base64.b64decode(result[1])
830         if code == ERROR:
831             raise RuntimeError(text)
832
833     def defer_add_route(self, guid, destination, netprefix, nexthop):
834         msg = testbed_messages[ADD_ROUTE]
835         msg = msg % (guid, destination, netprefix, nexthop)
836         self._client.send_msg(msg)
837         reply = self._client.read_reply()
838         result = reply.split("|")
839         code = int(result[0])
840         text = base64.b64decode(result[1])
841         if code == ERROR:
842             raise RuntimeError(text)
843
844     def do_setup(self):
845         msg = testbed_messages[DO_SETUP]
846         self._client.send_msg(msg)
847         reply = self._client.read_reply()
848         result = reply.split("|")
849         code = int(result[0])
850         text = base64.b64decode(result[1])
851         if code == ERROR:
852             raise RuntimeError(text)
853
854     def do_create(self):
855         msg = testbed_messages[DO_CREATE]
856         self._client.send_msg(msg)
857         reply = self._client.read_reply()
858         result = reply.split("|")
859         code = int(result[0])
860         text = base64.b64decode(result[1])
861         if code == ERROR:
862             raise RuntimeError(text)
863
864     def do_connect(self):
865         msg = testbed_messages[DO_CONNECT]
866         self._client.send_msg(msg)
867         reply = self._client.read_reply()
868         result = reply.split("|")
869         code = int(result[0])
870         text = base64.b64decode(result[1])
871         if code == ERROR:
872             raise RuntimeError(text)
873
874     def do_configure(self):
875         msg = testbed_messages[DO_CONFIGURE]
876         self._client.send_msg(msg)
877         reply = self._client.read_reply()
878         result = reply.split("|")
879         code = int(result[0])
880         text = base64.b64decode(result[1])
881         if code == ERROR:
882             raise RuntimeError(text)
883
884     def do_cross_connect(self):
885         msg = testbed_messages[DO_CROSS_CONNECT]
886         self._client.send_msg(msg)
887         reply = self._client.read_reply()
888         result = reply.split("|")
889         code = int(result[0])
890         text = base64.b64decode(result[1])
891         if code == ERROR:
892             raise RuntimeError(text)
893
894     def start(self, time = TIME_NOW):
895         msg = testbed_messages[START]
896         self._client.send_msg(msg)
897         reply = self._client.read_reply()
898         result = reply.split("|")
899         code = int(result[0])
900         text = base64.b64decode(result[1])
901         if code == ERROR:
902             raise RuntimeError(text)
903
904     def stop(self, time = TIME_NOW):
905         msg = testbed_messages[STOP]
906         self._client.send_msg(msg)
907         reply = self._client.read_reply()
908         result = reply.split("|")
909         code = int(result[0])
910         text = base64.b64decode(result[1])
911         if code == ERROR:
912             raise RuntimeError(text)
913
914     def set(self, time, guid, name, value):
915         msg = testbed_messages[SET]
916         type = get_type(value)
917         # avoid having "|" in this parameters
918         name = base64.b64encode(name)
919         value = base64.b64encode(str(value))
920         msg = msg % (time, guid, name, value, type)
921         self._client.send_msg(msg)
922         reply = self._client.read_reply()
923         result = reply.split("|")
924         code = int(result[0])
925         text = base64.b64decode(result[1])
926         if code == ERROR:
927             raise RuntimeError(text)
928
929     def get(self, time, guid, name):
930         msg = testbed_messages[GET]
931         # avoid having "|" in this parameters
932         name = base64.b64encode(name)
933         msg = msg % (time, guid, name)
934         self._client.send_msg(msg)
935         reply = self._client.read_reply()
936         result = reply.split("|")
937         code = int(result[0])
938         text = base64.b64decode(result[1])
939         if code == ERROR:
940             raise RuntimeError(text)
941         return text
942
943     def get_address(self, guid, index, attribute):
944         msg = testbed_messages[GET_ADDRESS]
945         # avoid having "|" in this parameters
946         attribute = base64.b64encode(attribute)
947         msg = msg % (guid, index, attribute)
948         self._client.send_msg(msg)
949         reply = self._client.read_reply()
950         result = reply.split("|")
951         code = int(result[0])
952         text = base64.b64decode(result[1])
953         if code == ERROR:
954             raise RuntimeError(text)
955         return text
956
957     def get_route(self, guid, index, attribute):
958         msg = testbed_messages[GET_ROUTE]
959         # avoid having "|" in this parameters
960         attribute = base64.b64encode(attribute)
961         msg = msg % (guid, index, attribute)
962         self._client.send_msg(msg)
963         reply = self._client.read_reply()
964         result = reply.split("|")
965         code = int(result[0])
966         text = base64.b64decode(result[1])
967         if code == ERROR:
968             raise RuntimeError(text)
969         return text
970
971     def action(self, time, guid, action):
972         msg = testbed_messages[ACTION]
973         msg = msg % (time, guid, action)
974         self._client.send_msg(msg)
975         reply = self._client.read_reply()
976         result = reply.split("|")
977         code = int(result[0])
978         text = base64.b64decode(result[1])
979         if code == ERROR:
980             raise RuntimeError(text)
981
982     def status(self, guid):
983         msg = testbed_messages[STATUS]
984         msg = msg % (guid)
985         self._client.send_msg(msg)
986         reply = self._client.read_reply()
987         result = reply.split("|")
988         code = int(result[0])
989         text = base64.b64decode(result[1])
990         if code == ERROR:
991             raise RuntimeError(text)
992         return int(text)
993
994     def trace(self, guid, trace_id, attribute='value'):
995         msg = testbed_messages[TRACE]
996         attribute = base64.b64encode(attribute)
997         msg = msg % (guid, trace_id, attribute)
998         self._client.send_msg(msg)
999         reply = self._client.read_reply()
1000         result = reply.split("|")
1001         code = int(result[0])
1002         text = base64.b64decode(result[1])
1003         if code == ERROR:
1004             raise RuntimeError(text)
1005         return text
1006
1007     def shutdown(self):
1008         msg = testbed_messages[SHUTDOWN]
1009         self._client.send_msg(msg)
1010         reply = self._client.read_reply()
1011         result = reply.split("|")
1012         code = int(result[0])
1013         text = base64.b64decode(result[1])
1014         if code == ERROR:
1015             raise RuntimeError(text)
1016         self._client.send_stop()
1017         self._client.read_reply() # wait for it
1018
1019 class ExperimentControllerProxy(object):
1020     def __init__(self, root_dir, log_level, experiment_xml = None, 
1021             launch = True, host = None, port = None, user = None, 
1022             agent = None):
1023         if launch:
1024             # launch server
1025             if experiment_xml == None:
1026                 raise RuntimeError("To launch a ExperimentControllerServer a \
1027                         xml description of the experiment is required")
1028             # ssh
1029             if host != None:
1030                 xml = experiment_xml
1031                 xml = xml.replace("'", r"\'")
1032                 xml = xml.replace("\"", r"\'")
1033                 xml = xml.replace("\n", r"")
1034                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1035                         s = ExperimentControllerServer(%r, %r, %r);\
1036                         s.run()" % (root_dir, log_level, xml)
1037                 self._client = launch_ssh_daemon_client(root_dir, python_code,
1038                         host, port, user, agent)
1039             else:
1040                 # launch daemon
1041                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1042                 s.run()
1043                 # create client
1044                 self._client = server.Client(root_dir)
1045         else:
1046             # attempt to reconnect
1047             if host != None:
1048                 self._client = launch_ssh_daemon_client(root_dir, None,
1049                         host, port, user, agent)
1050             else:
1051                 self._client = server.Client(root_dir)
1052
1053     @property
1054     def experiment_xml(self):
1055         msg = controller_messages[XML]
1056         self._client.send_msg(msg)
1057         reply = self._client.read_reply()
1058         result = reply.split("|")
1059         code = int(result[0])
1060         text = base64.b64decode(result[1])
1061         if code == ERROR:
1062             raise RuntimeError(text)
1063         return text
1064
1065     def set_access_configuration(self, testbed_guid, access_config):
1066         mode = access_config.get_attribute_value("mode")
1067         communication = access_config.get_attribute_value("communication")
1068         host = access_config.get_attribute_value("host")
1069         user = access_config.get_attribute_value("user")
1070         port = access_config.get_attribute_value("port")
1071         root_dir = access_config.get_attribute_value("rootDirectory")
1072         use_agent = access_config.get_attribute_value("useAgent")
1073         log_level = access_config.get_attribute_value("logLevel")
1074         msg = controller_messages[ACCESS]
1075         msg = msg % (testbed_guid, mode, communication, host, user, port, 
1076                 root_dir, use_agent, log_level)
1077         self._client.send_msg(msg)
1078         reply = self._client.read_reply()
1079         result = reply.split("|")
1080         code = int(result[0])
1081         text =  base64.b64decode(result[1])
1082         if code == ERROR:
1083             raise RuntimeError(text)
1084
1085     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1086         msg = controller_messages[TRACE]
1087         attribute = base64.b64encode(attribute)
1088         msg = msg % (testbed_guid, guid, trace_id, attribute)
1089         self._client.send_msg(msg)
1090         reply = self._client.read_reply()
1091         result = reply.split("|")
1092         code = int(result[0])
1093         text =  base64.b64decode(result[1])
1094         if code == OK:
1095             return text
1096         raise RuntimeError(text)
1097
1098     def start(self):
1099         msg = controller_messages[START]
1100         self._client.send_msg(msg)
1101         reply = self._client.read_reply()
1102         result = reply.split("|")
1103         code = int(result[0])
1104         text =  base64.b64decode(result[1])
1105         if code == ERROR:
1106             raise RuntimeError(text)
1107
1108     def stop(self):
1109         msg = controller_messages[STOP]
1110         self._client.send_msg(msg)
1111         reply = self._client.read_reply()
1112         result = reply.split("|")
1113         code = int(result[0])
1114         text =  base64.b64decode(result[1])
1115         if code == ERROR:
1116             raise RuntimeError(text)
1117
1118     def recover(self):
1119         msg = controller_messages[RECOVER]
1120         self._client.send_msg(msg)
1121         reply = self._client.read_reply()
1122         result = reply.split("|")
1123         code = int(result[0])
1124         text =  base64.b64decode(result[1])
1125         if code == ERROR:
1126             raise RuntimeError(text)
1127
1128     def is_finished(self, guid):
1129         msg = controller_messages[FINISHED]
1130         msg = msg % guid
1131         self._client.send_msg(msg)
1132         reply = self._client.read_reply()
1133         result = reply.split("|")
1134         code = int(result[0])
1135         text = base64.b64decode(result[1])
1136         if code == ERROR:
1137             raise RuntimeError(text)
1138         return text == "True"
1139
1140     def shutdown(self):
1141         msg = controller_messages[SHUTDOWN]
1142         self._client.send_msg(msg)
1143         reply = self._client.read_reply()
1144         result = reply.split("|")
1145         code = int(result[0])
1146         text =  base64.b64decode(result[1])
1147         if code == ERROR:
1148             raise RuntimeError(text)
1149         self._client.send_stop()
1150         self._client.read_reply() # wait for it
1151