574ef6bfed71dc841d96c922a31d9a47f2a75dbf
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import sys
10 import time
11 import tempfile
12 import shutil
13
14 # PROTOCOL REPLIES
15 OK = 0
16 ERROR = 1
17
18 # PROTOCOL INSTRUCTION MESSAGES
19 XML = 2 
20 ACCESS  = 3
21 TRACE   = 4
22 FINISHED    = 5
23 START   = 6
24 STOP    = 7
25 SHUTDOWN    = 8
26 CONFIGURE   = 9
27 CREATE      = 10
28 CREATE_SET  = 11
29 FACTORY_SET = 12
30 CONNECT     = 13
31 CROSS_CONNECT   = 14
32 ADD_TRACE   = 15
33 ADD_ADDRESS = 16
34 ADD_ROUTE   = 17
35 DO_SETUP    = 18
36 DO_CREATE   = 19
37 DO_CONNECT  = 20
38 DO_CONFIGURE    = 21
39 DO_CROSS_CONNECT    = 22
40 GET = 23
41 SET = 24
42 ACTION  = 25
43 STATUS  = 26
44 GUIDS  = 27
45 GET_ROUTE = 28
46 GET_ADDRESS = 29
47 RECOVER = 30
48
49 # PARAMETER TYPE
50 STRING  =  100
51 INTEGER = 101
52 BOOL    = 102
53 FLOAT   = 103
54
55 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
56 controller_messages = dict({
57     XML:    "%d" % XML,
58     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
59     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
60     FINISHED:   "%d|%s" % (FINISHED, "%d"),
61     START:  "%d" % START,
62     STOP:   "%d" % STOP,
63     RECOVER : "%d" % RECOVER,
64     SHUTDOWN:   "%d" % SHUTDOWN,
65     })
66
67 # TESTBED INSTANCE PROTOCOL MESSAGES
68 testbed_messages = dict({
69     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
70     START:  "%d" % START,
71     STOP:   "%d" % STOP,
72     SHUTDOWN:   "%d" % SHUTDOWN,
73     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
74     CREATE: "%d|%s" % (CREATE, "%d|%s"),
75     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
76     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
77     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
78     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
79     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
80     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
81     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
82     DO_SETUP:   "%d" % DO_SETUP,
83     DO_CREATE:  "%d" % DO_CREATE,
84     DO_CONNECT: "%d" % DO_CONNECT,
85     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
86     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
87     GET:    "%d|%s" % (GET, "%s|%d|%s"),
88     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
89     GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
90     GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
91     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
92     STATUS: "%d|%s" % (STATUS, "%d"),
93     GUIDS:  "%d" % GUIDS,
94     })
95
96 instruction_text = dict({
97     OK:     "OK",
98     ERROR:  "ERROR",
99     XML:    "XML",
100     ACCESS: "ACCESS",
101     TRACE:  "TRACE",
102     FINISHED:   "FINISHED",
103     START:  "START",
104     STOP:   "STOP",
105     RECOVER: "RECOVER",
106     SHUTDOWN:   "SHUTDOWN",
107     CONFIGURE:  "CONFIGURE",
108     CREATE: "CREATE",
109     CREATE_SET: "CREATE_SET",
110     FACTORY_SET:    "FACTORY_SET",
111     CONNECT:    "CONNECT",
112     CROSS_CONNECT: "CROSS_CONNECT",
113     ADD_TRACE:  "ADD_TRACE",
114     ADD_ADDRESS:    "ADD_ADDRESS",
115     ADD_ROUTE:  "ADD_ROUTE",
116     DO_SETUP:   "DO_SETUP",
117     DO_CREATE:  "DO_CREATE",
118     DO_CONNECT: "DO_CONNECT",
119     DO_CONFIGURE:   "DO_CONFIGURE",
120     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
121     GET:    "GET",
122     SET:    "SET",
123     GET_ROUTE: "GET_ROUTE",
124     GET_ADDRESS: "GET_ADDRESS",
125     ACTION: "ACTION",
126     STATUS: "STATUS",
127     GUIDS:  "GUIDS",
128     STRING: "STRING",
129     INTEGER:    "INTEGER",
130     BOOL:   "BOOL",
131     FLOAT:  "FLOAT"
132     })
133
134 def get_type(value):
135     if isinstance(value, bool):
136         return BOOL
137     elif isinstance(value, int):
138         return INTEGER
139     elif isinstance(value, float):
140         return FLOAT
141     else:
142         return STRING
143
144 def set_type(type, value):
145     if type == INTEGER:
146         value = int(value)
147     elif type == FLOAT:
148         value = float(value)
149     elif type == BOOL:
150         value = value == "True"
151     else:
152         value = str(value)
153     return value
154
155 def log_msg(server, params):
156     instr = int(params[0])
157     instr_txt = instruction_text[instr]
158     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
159         instr_txt, ", ".join(map(str, params[1:]))))
160
161 def log_reply(server, reply):
162     res = reply.split("|")
163     code = int(res[0])
164     code_txt = instruction_text[code]
165     txt = base64.b64decode(res[1])
166     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
167             code_txt, txt))
168
169 def to_server_log_level(log_level):
170     return server.DEBUG_LEVEL \
171             if log_level == AccessConfiguration.DEBUG_LEVEL \
172                 else server.ERROR_LEVEL
173
174 def get_access_config_params(access_config):
175     root_dir = access_config.get_attribute_value("rootDirectory")
176     log_level = access_config.get_attribute_value("logLevel")
177     log_level = to_server_log_level(log_level)
178     user = host = port = agent = None
179     communication = access_config.get_attribute_value("communication")
180     if communication == AccessConfiguration.ACCESS_SSH:
181         user = access_config.get_attribute_value("user")
182         host = access_config.get_attribute_value("host")
183         port = access_config.get_attribute_value("port")
184         agent = access_config.get_attribute_value("useAgent")
185     return (root_dir, log_level, user, host, port, agent)
186
187 class AccessConfiguration(AttributesMap):
188     MODE_SINGLE_PROCESS = "SINGLE"
189     MODE_DAEMON = "DAEMON"
190     ACCESS_SSH = "SSH"
191     ACCESS_LOCAL = "LOCAL"
192     ERROR_LEVEL = "Error"
193     DEBUG_LEVEL = "Debug"
194
195     def __init__(self):
196         super(AccessConfiguration, self).__init__()
197         self.add_attribute(name = "mode",
198                 help = "Instance execution mode",
199                 type = Attribute.ENUM,
200                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
201                 allowed = [AccessConfiguration.MODE_DAEMON,
202                     AccessConfiguration.MODE_SINGLE_PROCESS],
203                 validation_function = validation.is_enum)
204         self.add_attribute(name = "communication",
205                 help = "Instance communication mode",
206                 type = Attribute.ENUM,
207                 value = AccessConfiguration.ACCESS_LOCAL,
208                 allowed = [AccessConfiguration.ACCESS_LOCAL,
209                     AccessConfiguration.ACCESS_SSH],
210                 validation_function = validation.is_enum)
211         self.add_attribute(name = "host",
212                 help = "Host where the testbed will be executed",
213                 type = Attribute.STRING,
214                 value = "localhost",
215                 validation_function = validation.is_string)
216         self.add_attribute(name = "user",
217                 help = "User on the Host to execute the testbed",
218                 type = Attribute.STRING,
219                 value = getpass.getuser(),
220                 validation_function = validation.is_string)
221         self.add_attribute(name = "port",
222                 help = "Port on the Host",
223                 type = Attribute.INTEGER,
224                 value = 22,
225                 validation_function = validation.is_integer)
226         self.add_attribute(name = "rootDirectory",
227                 help = "Root directory for storing process files",
228                 type = Attribute.STRING,
229                 value = ".",
230                 validation_function = validation.is_string) # TODO: validation.is_path
231         self.add_attribute(name = "useAgent",
232                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
233                 type = Attribute.BOOL,
234                 value = False,
235                 validation_function = validation.is_bool)
236         self.add_attribute(name = "logLevel",
237                 help = "Log level for instance",
238                 type = Attribute.ENUM,
239                 value = AccessConfiguration.ERROR_LEVEL,
240                 allowed = [AccessConfiguration.ERROR_LEVEL,
241                     AccessConfiguration.DEBUG_LEVEL],
242                 validation_function = validation.is_enum)
243         self.add_attribute(name = "recover",
244                 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
245                 type = Attribute.BOOL,
246                 value = False,
247                 validation_function = validation.is_bool)
248
249 class TempDir(object):
250     def __init__(self):
251         self.path = tempfile.mkdtemp()
252     
253     def __del__(self):
254         shutil.rmtree(self.path)
255
256 class PermDir(object):
257     def __init__(self, path):
258         self.path = path
259
260 def create_controller(xml, access_config = None):
261     mode = None if not access_config \
262             else access_config.get_attribute_value("mode")
263     launch = True if not access_config \
264             else not access_config.get_attribute_value("recover")
265     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
266         if not launch:
267             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
268         
269         from nepi.core.execute import ExperimentController
270         
271         if not access_config or not access_config.has_attribute("rootDirectory"):
272             root_dir = TempDir()
273         else:
274             root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
275         controller = ExperimentController(xml, root_dir.path)
276         
277         # inject reference to temporary dir, so that it gets cleaned
278         # up at destruction time.
279         controller._tempdir = root_dir
280         
281         return controller
282     elif mode == AccessConfiguration.MODE_DAEMON:
283         (root_dir, log_level, user, host, port, agent) = \
284                 get_access_config_params(access_config)
285         return ExperimentControllerProxy(root_dir, log_level,
286                 experiment_xml = xml, host = host, port = port, user = user, 
287                 agent = agent, launch = launch)
288     raise RuntimeError("Unsupported access configuration '%s'" % mode)
289
290 def create_testbed_instance(testbed_id, testbed_version, access_config):
291     mode = None if not access_config \
292             else access_config.get_attribute_value("mode")
293     launch = True if not access_config \
294             else not access_config.get_attribute_value("recover")
295     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
296         if not launch:
297             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
298         return  _build_testbed_instance(testbed_id, testbed_version)
299     elif mode == AccessConfiguration.MODE_DAEMON:
300         (root_dir, log_level, user, host, port, agent) = \
301                 get_access_config_params(access_config)
302         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
303                 testbed_version = testbed_version, host = host, port = port,
304                 user = user, agent = agent, launch = launch)
305     raise RuntimeError("Unsupported access configuration '%s'" % mode)
306
307 def _build_testbed_instance(testbed_id, testbed_version):
308     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
309     if not mod_name in sys.modules:
310         __import__(mod_name)
311     module = sys.modules[mod_name]
312     return module.TestbedController(testbed_version)
313
314 class TestbedControllerServer(server.Server):
315     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
316         super(TestbedControllerServer, self).__init__(root_dir, log_level)
317         self._testbed_id = testbed_id
318         self._testbed_version = testbed_version
319         self._testbed = None
320
321     def post_daemonize(self):
322         self._testbed = _build_testbed_instance(self._testbed_id, 
323                 self._testbed_version)
324
325     def reply_action(self, msg):
326         if not msg:
327             result = base64.b64encode("Invalid command line")
328             reply = "%d|%s" % (ERROR, result)
329         else:
330             params = msg.split("|")
331             instruction = int(params[0])
332             log_msg(self, params)
333             try:
334                 if instruction == TRACE:
335                     reply = self.trace(params)
336                 elif instruction == START:
337                     reply = self.start(params)
338                 elif instruction == STOP:
339                     reply = self.stop(params)
340                 elif instruction == SHUTDOWN:
341                     reply = self.shutdown(params)
342                 elif instruction == CONFIGURE:
343                     reply = self.defer_configure(params)
344                 elif instruction == CREATE:
345                     reply = self.defer_create(params)
346                 elif instruction == CREATE_SET:
347                     reply = self.defer_create_set(params)
348                 elif instruction == FACTORY_SET:
349                     reply = self.defer_factory_set(params)
350                 elif instruction == CONNECT:
351                     reply = self.defer_connect(params)
352                 elif instruction == CROSS_CONNECT:
353                     reply = self.defer_cross_connect(params)
354                 elif instruction == ADD_TRACE:
355                     reply = self.defer_add_trace(params)
356                 elif instruction == ADD_ADDRESS:
357                     reply = self.defer_add_address(params)
358                 elif instruction == ADD_ROUTE:
359                     reply = self.defer_add_route(params)
360                 elif instruction == DO_SETUP:
361                     reply = self.do_setup(params)
362                 elif instruction == DO_CREATE:
363                     reply = self.do_create(params)
364                 elif instruction == DO_CONNECT:
365                     reply = self.do_connect(params)
366                 elif instruction == DO_CONFIGURE:
367                     reply = self.do_configure(params)
368                 elif instruction == DO_CROSS_CONNECT:
369                     reply = self.do_cross_connect(params)
370                 elif instruction == GET:
371                     reply = self.get(params)
372                 elif instruction == SET:
373                     reply = self.set(params)
374                 elif instruction == GET_ADDRESS:
375                     reply = self.get_address(params)
376                 elif instruction == GET_ROUTE:
377                     reply = self.get_route(params)
378                 elif instruction == ACTION:
379                     reply = self.action(params)
380                 elif instruction == STATUS:
381                     reply = self.status(params)
382                 elif instruction == GUIDS:
383                     reply = self.guids(params)
384                 else:
385                     error = "Invalid instruction %s" % instruction
386                     self.log_error(error)
387                     result = base64.b64encode(error)
388                     reply = "%d|%s" % (ERROR, result)
389             except:
390                 error = self.log_error()
391                 result = base64.b64encode(error)
392                 reply = "%d|%s" % (ERROR, result)
393         log_reply(self, reply)
394         return reply
395
396     def guids(self, params):
397         guids = self._testbed.guids
398         guids = ",".join(map(str, guids))
399         result = base64.b64encode(guids)
400         return "%d|%s" % (OK, result)
401
402     def defer_create(self, params):
403         guid = int(params[1])
404         factory_id = params[2]
405         self._testbed.defer_create(guid, factory_id)
406         return "%d|%s" % (OK, "")
407
408     def trace(self, params):
409         guid = int(params[1])
410         trace_id = params[2]
411         attribute = base64.b64decode(params[3])
412         trace = self._testbed.trace(guid, trace_id, attribute)
413         result = base64.b64encode(trace)
414         return "%d|%s" % (OK, result)
415
416     def start(self, params):
417         self._testbed.start()
418         return "%d|%s" % (OK, "")
419
420     def stop(self, params):
421         self._testbed.stop()
422         return "%d|%s" % (OK, "")
423
424     def shutdown(self, params):
425         self._testbed.shutdown()
426         return "%d|%s" % (OK, "")
427
428     def defer_configure(self, params):
429         name = base64.b64decode(params[1])
430         value = base64.b64decode(params[2])
431         type = int(params[3])
432         value = set_type(type, value)
433         self._testbed.defer_configure(name, value)
434         return "%d|%s" % (OK, "")
435
436     def defer_create_set(self, params):
437         guid = int(params[1])
438         name = base64.b64decode(params[2])
439         value = base64.b64decode(params[3])
440         type = int(params[4])
441         value = set_type(type, value)
442         self._testbed.defer_create_set(guid, name, value)
443         return "%d|%s" % (OK, "")
444
445     def defer_factory_set(self, params):
446         name = base64.b64decode(params[1])
447         value = base64.b64decode(params[2])
448         type = int(params[3])
449         value = set_type(type, value)
450         self._testbed.defer_factory_set(name, value)
451         return "%d|%s" % (OK, "")
452
453     def defer_connect(self, params):
454         guid1 = int(params[1])
455         connector_type_name1 = params[2]
456         guid2 = int(params[3])
457         connector_type_name2 = params[4]
458         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
459             connector_type_name2)
460         return "%d|%s" % (OK, "")
461
462     def defer_cross_connect(self, params):
463         guid = int(params[1])
464         connector_type_name = params[2]
465         cross_guid = int(params[3])
466         connector_type_name = params[4]
467         cross_guid = int(params[5])
468         cross_testbed_id = params[6]
469         cross_factory_id = params[7]
470         cross_connector_type_name = params[8]
471         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
472             cross_testbed_id, cross_factory_id, cross_connector_type_name)
473         return "%d|%s" % (OK, "")
474
475     def defer_add_trace(self, params):
476         guid = int(params[1])
477         trace_id = params[2]
478         self._testbed.defer_add_trace(guid, trace_id)
479         return "%d|%s" % (OK, "")
480
481     def defer_add_address(self, params):
482         guid = int(params[1])
483         address = params[2]
484         netprefix = int(params[3])
485         broadcast = params[4]
486         self._testbed.defer_add_address(guid, address, netprefix,
487                 broadcast)
488         return "%d|%s" % (OK, "")
489
490     def defer_add_route(self, params):
491         guid = int(params[1])
492         destination = params[2]
493         netprefix = int(params[3])
494         nexthop = params[4]
495         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
496         return "%d|%s" % (OK, "")
497
498     def do_setup(self, params):
499         self._testbed.do_setup()
500         return "%d|%s" % (OK, "")
501
502     def do_create(self, params):
503         self._testbed.do_create()
504         return "%d|%s" % (OK, "")
505
506     def do_connect(self, params):
507         self._testbed.do_connect()
508         return "%d|%s" % (OK, "")
509
510     def do_configure(self, params):
511         self._testbed.do_configure()
512         return "%d|%s" % (OK, "")
513
514     def do_cross_connect(self, params):
515         self._testbed.do_cross_connect()
516         return "%d|%s" % (OK, "")
517
518     def get(self, params):
519         time = params[1]
520         guid = int(param[2] )
521         name = base64.b64decode(params[3])
522         value = self._testbed.get(time, guid, name)
523         result = base64.b64encode(str(value))
524         return "%d|%s" % (OK, result)
525
526     def set(self, params):
527         time = params[1]
528         guid = int(params[2])
529         name = base64.b64decode(params[3])
530         value = base64.b64decode(params[4])
531         type = int(params[3])
532         value = set_type(type, value)
533         self._testbed.set(time, guid, name, value)
534         return "%d|%s" % (OK, "")
535
536     def get_address(self, params):
537         guid = int(param[1])
538         index = int(param[2])
539         attribute = base64.b64decode(param[3])
540         value = self._testbed.get_address(guid, index, attribute)
541         result = base64.b64encode(str(value))
542         return "%d|%s" % (OK, result)
543
544     def get_route(self, params):
545         guid = int(param[1])
546         index = int(param[2])
547         attribute = base64.b64decode(param[3])
548         value = self._testbed.get_route(guid, index, attribute)
549         result = base64.b64encode(str(value))
550         return "%d|%s" % (OK, result)
551
552     def action(self, params):
553         time = params[1]
554         guid = int(params[2])
555         command = base64.b64decode(params[3])
556         self._testbed.action(time, guid, command)
557         return "%d|%s" % (OK, "")
558
559     def status(self, params):
560         guid = int(params[1])
561         status = self._testbed.status(guid)
562         result = base64.b64encode(str(status))
563         return "%d|%s" % (OK, result)
564  
565 class ExperimentControllerServer(server.Server):
566     def __init__(self, root_dir, log_level, experiment_xml):
567         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
568         self._experiment_xml = experiment_xml
569         self._controller = None
570
571     def post_daemonize(self):
572         from nepi.core.execute import ExperimentController
573         self._controller = ExperimentController(self._experiment_xml, 
574             root_dir = self._root_dir)
575
576     def reply_action(self, msg):
577         if not msg:
578             result = base64.b64encode("Invalid command line")
579             reply = "%d|%s" % (ERROR, result)
580         else:
581             params = msg.split("|")
582             instruction = int(params[0])
583             log_msg(self, params)
584             try:
585                 if instruction == XML:
586                     reply = self.experiment_xml(params)
587                 elif instruction == ACCESS:
588                     reply = self.set_access_configuration(params)
589                 elif instruction == TRACE:
590                     reply = self.trace(params)
591                 elif instruction == FINISHED:
592                     reply = self.is_finished(params)
593                 elif instruction == START:
594                     reply = self.start(params)
595                 elif instruction == STOP:
596                     reply = self.stop(params)
597                 elif instruction == RECOVER:
598                     reply = self.recover(params)
599                 elif instruction == SHUTDOWN:
600                     reply = self.shutdown(params)
601                 else:
602                     error = "Invalid instruction %s" % instruction
603                     self.log_error(error)
604                     result = base64.b64encode(error)
605                     reply = "%d|%s" % (ERROR, result)
606             except:
607                 error = self.log_error()
608                 result = base64.b64encode(error)
609                 reply = "%d|%s" % (ERROR, result)
610         log_reply(self, reply)
611         return reply
612
613     def experiment_xml(self, params):
614         xml = self._controller.experiment_xml
615         result = base64.b64encode(xml)
616         return "%d|%s" % (OK, result)
617
618     def set_access_configuration(self, params):
619         testbed_guid = int(params[1])
620         mode = params[2]
621         communication = params[3]
622         host = params[4]
623         user = params[5]
624         port = int(params[6])
625         root_dir = params[7]
626         use_agent = params[8] == "True"
627         log_level = params[9]
628         access_config = AccessConfiguration()
629         access_config.set_attribute_value("mode", mode)
630         access_config.set_attribute_value("communication", communication)
631         access_config.set_attribute_value("host", host)
632         access_config.set_attribute_value("user", user)
633         access_config.set_attribute_value("port", port)
634         access_config.set_attribute_value("rootDirectory", root_dir)
635         access_config.set_attribute_value("useAgent", use_agent)
636         access_config.set_attribute_value("logLevel", log_level)
637         self._controller.set_access_configuration(testbed_guid, 
638                 access_config)
639         return "%d|%s" % (OK, "")
640
641     def trace(self, params):
642         testbed_guid = int(params[1])
643         guid = int(params[2])
644         trace_id = params[3]
645         attribute = base64.b64decode(params[4])
646         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
647         result = base64.b64encode(trace)
648         return "%d|%s" % (OK, result)
649
650     def is_finished(self, params):
651         guid = int(params[1])
652         status = self._controller.is_finished(guid)
653         result = base64.b64encode(str(status))
654         return "%d|%s" % (OK, result)
655
656     def start(self, params):
657         self._controller.start()
658         return "%d|%s" % (OK, "")
659
660     def stop(self, params):
661         self._controller.stop()
662         return "%d|%s" % (OK, "")
663
664     def recover(self, params):
665         self._controller.recover()
666         return "%d|%s" % (OK, "")
667
668     def shutdown(self, params):
669         self._controller.shutdown()
670         return "%d|%s" % (OK, "")
671
672 class TestbedControllerProxy(object):
673     def __init__(self, root_dir, log_level, testbed_id = None, 
674             testbed_version = None, launch = True, host = None, 
675             port = None, user = None, agent = None):
676         if launch:
677             if testbed_id == None or testbed_version == None:
678                 raise RuntimeError("To launch a TesbedInstance server a \
679                         testbed_id and testbed_version are required")
680             # ssh
681             if host != None:
682                 python_code = "from nepi.util.proxy import \
683                         TesbedInstanceServer;\
684                         s = TestbedControllerServer('%s', %d, '%s', '%s');\
685                         s.run()" % (root_dir, log_level, testbed_id, 
686                                 testbed_version)
687                 proc = server.popen_ssh_subprocess(python_code, host = host,
688                     port = port, user = user, agent = agent)
689                 if proc.poll():
690                     err = proc.stderr.read()
691                     raise RuntimeError("Server could not be executed: %s" % \
692                             err)
693             else:
694                 # launch daemon
695                 s = TestbedControllerServer(root_dir, log_level, testbed_id, 
696                     testbed_version)
697                 s.run()
698
699         # connect client to server
700         self._client = server.Client(root_dir, host = host, port = port, 
701                 user = user, agent = agent)
702
703     @property
704     def guids(self):
705         msg = testbed_messages[GUIDS]
706         self._client.send_msg(msg)
707         reply = self._client.read_reply()
708         result = reply.split("|")
709         code = int(result[0])
710         text = base64.b64decode(result[1])
711         if code == ERROR:
712             raise RuntimeError(text)
713         return map(int, text.split(","))
714
715     def defer_configure(self, name, value):
716         msg = testbed_messages[CONFIGURE]
717         type = get_type(value)
718         # avoid having "|" in this parameters
719         name = base64.b64encode(name)
720         value = base64.b64encode(str(value))
721         msg = msg % (name, value, type)
722         self._client.send_msg(msg)
723         reply = self._client.read_reply()
724         result = reply.split("|")
725         code = int(result[0])
726         text = base64.b64decode(result[1])
727         if code == ERROR:
728             raise RuntimeError(text)
729
730     def defer_create(self, guid, factory_id):
731         msg = testbed_messages[CREATE]
732         msg = msg % (guid, factory_id)
733         self._client.send_msg(msg)
734         reply = self._client.read_reply()
735         result = reply.split("|")
736         code = int(result[0])
737         text = base64.b64decode(result[1])
738         if code == ERROR:
739             raise RuntimeError(text)
740
741     def defer_create_set(self, guid, name, value):
742         msg = testbed_messages[CREATE_SET]
743         type = get_type(value)
744         # avoid having "|" in this parameters
745         name = base64.b64encode(name)
746         value = base64.b64encode(str(value))
747         msg = msg % (guid, name, value, type)
748         self._client.send_msg(msg)
749         reply = self._client.read_reply()
750         result = reply.split("|")
751         code = int(result[0])
752         text = base64.b64decode(result[1])
753         if code == ERROR:
754             raise RuntimeError(text)
755
756     def defer_factory_set(self, guid, name, value):
757         msg = testbed_messages[FACTORY_SET]
758         type = get_type(value)
759         # avoid having "|" in this parameters
760         name = base64.b64encode(name)
761         value = base64.b64encode(str(value))
762         msg = msg % (guid, name, value, type)
763         self._client.send_msg(msg)
764         reply = self._client.read_reply()
765         result = reply.split("|")
766         code = int(result[0])
767         text = base64.b64decode(result[1])
768         if code == ERROR:
769             raise RuntimeError(text)
770
771     def defer_connect(self, guid1, connector_type_name1, guid2, 
772             connector_type_name2): 
773         msg = testbed_messages[CONNECT]
774         msg = msg % (guid1, connector_type_name1, guid2, 
775             connector_type_name2)
776         self._client.send_msg(msg)
777         reply = self._client.read_reply()
778         result = reply.split("|")
779         code = int(result[0])
780         text = base64.b64decode(result[1])
781         if code == ERROR:
782             raise RuntimeError(text)
783
784     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
785             cross_testbed_id, cross_factory_id, cross_connector_type_name):
786         msg = testbed_messages[CROSS_CONNECT]
787         msg = msg % (guid, connector_type_name, cross_guid, 
788             cross_testbed_id, cross_factory_id, cross_connector_type_name)
789         self._client.send_msg(msg)
790         reply = self._client.read_reply()
791         result = reply.split("|")
792         code = int(result[0])
793         text = base64.b64decode(result[1])
794         if code == ERROR:
795             raise RuntimeError(text)
796
797     def defer_add_trace(self, guid, trace_id):
798         msg = testbed_messages[ADD_TRACE]
799         msg = msg % (guid, trace_id)
800         self._client.send_msg(msg)
801         reply = self._client.read_reply()
802         result = reply.split("|")
803         code = int(result[0])
804         text = base64.b64decode(result[1])
805         if code == ERROR:
806             raise RuntimeError(text)
807
808     def defer_add_address(self, guid, address, netprefix, broadcast): 
809         msg = testbed_messages[ADD_ADDRESS]
810         msg = msg % (guid, address, netprefix, broadcast)
811         self._client.send_msg(msg)
812         reply = self._client.read_reply()
813         result = reply.split("|")
814         code = int(result[0])
815         text = base64.b64decode(result[1])
816         if code == ERROR:
817             raise RuntimeError(text)
818
819     def defer_add_route(self, guid, destination, netprefix, nexthop):
820         msg = testbed_messages[ADD_ROUTE]
821         msg = msg % (guid, destination, netprefix, nexthop)
822         self._client.send_msg(msg)
823         reply = self._client.read_reply()
824         result = reply.split("|")
825         code = int(result[0])
826         text = base64.b64decode(result[1])
827         if code == ERROR:
828             raise RuntimeError(text)
829
830     def do_setup(self):
831         msg = testbed_messages[DO_SETUP]
832         self._client.send_msg(msg)
833         reply = self._client.read_reply()
834         result = reply.split("|")
835         code = int(result[0])
836         text = base64.b64decode(result[1])
837         if code == ERROR:
838             raise RuntimeError(text)
839
840     def do_create(self):
841         msg = testbed_messages[DO_CREATE]
842         self._client.send_msg(msg)
843         reply = self._client.read_reply()
844         result = reply.split("|")
845         code = int(result[0])
846         text = base64.b64decode(result[1])
847         if code == ERROR:
848             raise RuntimeError(text)
849
850     def do_connect(self):
851         msg = testbed_messages[DO_CONNECT]
852         self._client.send_msg(msg)
853         reply = self._client.read_reply()
854         result = reply.split("|")
855         code = int(result[0])
856         text = base64.b64decode(result[1])
857         if code == ERROR:
858             raise RuntimeError(text)
859
860     def do_configure(self):
861         msg = testbed_messages[DO_CONFIGURE]
862         self._client.send_msg(msg)
863         reply = self._client.read_reply()
864         result = reply.split("|")
865         code = int(result[0])
866         text = base64.b64decode(result[1])
867         if code == ERROR:
868             raise RuntimeError(text)
869
870     def do_cross_connect(self):
871         msg = testbed_messages[DO_CROSS_CONNECT]
872         self._client.send_msg(msg)
873         reply = self._client.read_reply()
874         result = reply.split("|")
875         code = int(result[0])
876         text = base64.b64decode(result[1])
877         if code == ERROR:
878             raise RuntimeError(text)
879
880     def start(self, time = TIME_NOW):
881         msg = testbed_messages[START]
882         self._client.send_msg(msg)
883         reply = self._client.read_reply()
884         result = reply.split("|")
885         code = int(result[0])
886         text = base64.b64decode(result[1])
887         if code == ERROR:
888             raise RuntimeError(text)
889
890     def stop(self, time = TIME_NOW):
891         msg = testbed_messages[STOP]
892         self._client.send_msg(msg)
893         reply = self._client.read_reply()
894         result = reply.split("|")
895         code = int(result[0])
896         text = base64.b64decode(result[1])
897         if code == ERROR:
898             raise RuntimeError(text)
899
900     def set(self, time, guid, name, value):
901         msg = testbed_messages[SET]
902         type = get_type(value)
903         # avoid having "|" in this parameters
904         name = base64.b64encode(name)
905         value = base64.b64encode(str(value))
906         msg = msg % (time, guid, name, value, type)
907         self._client.send_msg(msg)
908         reply = self._client.read_reply()
909         result = reply.split("|")
910         code = int(result[0])
911         text = base64.b64decode(result[1])
912         if code == ERROR:
913             raise RuntimeError(text)
914
915     def get(self, time, guid, name):
916         msg = testbed_messages[GET]
917         # avoid having "|" in this parameters
918         name = base64.b64encode(name)
919         msg = msg % (time, guid, name)
920         self._client.send_msg(msg)
921         reply = self._client.read_reply()
922         result = reply.split("|")
923         code = int(result[0])
924         text = base64.b64decode(result[1])
925         if code == ERROR:
926             raise RuntimeError(text)
927         return text
928
929     def get_address(self, guid, index, attribute):
930         msg = testbed_messages[GET_ADDRESS]
931         # avoid having "|" in this parameters
932         attribute = base64.b64encode(attribute)
933         msg = msg % (guid, index, attribute)
934         self._client.send_msg(msg)
935         reply = self._client.read_reply()
936         result = reply.split("|")
937         code = int(result[0])
938         text = base64.b64decode(result[1])
939         if code == ERROR:
940             raise RuntimeError(text)
941         return text
942
943     def get_route(self, guid, index, attribute):
944         msg = testbed_messages[GET_ROUTE]
945         # avoid having "|" in this parameters
946         attribute = base64.b64encode(attribute)
947         msg = msg % (guid, index, attribute)
948         self._client.send_msg(msg)
949         reply = self._client.read_reply()
950         result = reply.split("|")
951         code = int(result[0])
952         text = base64.b64decode(result[1])
953         if code == ERROR:
954             raise RuntimeError(text)
955         return text
956
957     def action(self, time, guid, action):
958         msg = testbed_messages[ACTION]
959         msg = msg % (time, guid, action)
960         self._client.send_msg(msg)
961         reply = self._client.read_reply()
962         result = reply.split("|")
963         code = int(result[0])
964         text = base64.b64decode(result[1])
965         if code == ERROR:
966             raise RuntimeError(text)
967
968     def status(self, guid):
969         msg = testbed_messages[STATUS]
970         msg = msg % (guid)
971         self._client.send_msg(msg)
972         reply = self._client.read_reply()
973         result = reply.split("|")
974         code = int(result[0])
975         text = base64.b64decode(result[1])
976         if code == ERROR:
977             raise RuntimeError(text)
978         return int(text)
979
980     def trace(self, guid, trace_id, attribute='value'):
981         msg = testbed_messages[TRACE]
982         attribute = base64.b64encode(attribute)
983         msg = msg % (guid, trace_id, attribute)
984         self._client.send_msg(msg)
985         reply = self._client.read_reply()
986         result = reply.split("|")
987         code = int(result[0])
988         text = base64.b64decode(result[1])
989         if code == ERROR:
990             raise RuntimeError(text)
991         return text
992
993     def shutdown(self):
994         msg = testbed_messages[SHUTDOWN]
995         self._client.send_msg(msg)
996         reply = self._client.read_reply()
997         result = reply.split("|")
998         code = int(result[0])
999         text = base64.b64decode(result[1])
1000         if code == ERROR:
1001             raise RuntimeError(text)
1002         self._client.send_stop()
1003         self._client.read_reply() # wait for it
1004
1005 class ExperimentControllerProxy(object):
1006     def __init__(self, root_dir, log_level, experiment_xml = None, 
1007             launch = True, host = None, port = None, user = None, 
1008             agent = None):
1009         if launch:
1010             # launch server
1011             if experiment_xml == None:
1012                 raise RuntimeError("To launch a ExperimentControllerServer a \
1013                         xml description of the experiment is required")
1014             # ssh
1015             if host != None:
1016                 xml = experiment_xml
1017                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1018                         s = ExperimentControllerServer(%r, %r, %r);\
1019                         s.run()" % (root_dir, log_level, xml)
1020                 proc = server.popen_ssh_subprocess(python_code, host = host,
1021                     port = port, user = user, agent = agent)
1022                 if proc.poll():
1023                     err = proc.stderr.read()
1024                     raise RuntimeError("Server could not be executed: %s" % \
1025                             err)
1026             else:
1027                 # launch daemon
1028                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1029                 s.run()
1030
1031         # connect client to server
1032         self._client = server.Client(root_dir, host = host, port = port, 
1033                 user = user, agent = agent)
1034
1035     @property
1036     def experiment_xml(self):
1037         msg = controller_messages[XML]
1038         self._client.send_msg(msg)
1039         reply = self._client.read_reply()
1040         result = reply.split("|")
1041         code = int(result[0])
1042         text = base64.b64decode(result[1])
1043         if code == ERROR:
1044             raise RuntimeError(text)
1045         return text
1046
1047     def set_access_configuration(self, testbed_guid, access_config):
1048         mode = access_config.get_attribute_value("mode")
1049         communication = access_config.get_attribute_value("communication")
1050         host = access_config.get_attribute_value("host")
1051         user = access_config.get_attribute_value("user")
1052         port = access_config.get_attribute_value("port")
1053         root_dir = access_config.get_attribute_value("rootDirectory")
1054         use_agent = access_config.get_attribute_value("useAgent")
1055         log_level = access_config.get_attribute_value("logLevel")
1056         msg = controller_messages[ACCESS]
1057         msg = msg % (testbed_guid, mode, communication, host, user, port, 
1058                 root_dir, use_agent, log_level)
1059         self._client.send_msg(msg)
1060         reply = self._client.read_reply()
1061         result = reply.split("|")
1062         code = int(result[0])
1063         text =  base64.b64decode(result[1])
1064         if code == ERROR:
1065             raise RuntimeError(text)
1066
1067     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1068         msg = controller_messages[TRACE]
1069         attribute = base64.b64encode(attribute)
1070         msg = msg % (testbed_guid, guid, trace_id, attribute)
1071         self._client.send_msg(msg)
1072         reply = self._client.read_reply()
1073         result = reply.split("|")
1074         code = int(result[0])
1075         text =  base64.b64decode(result[1])
1076         if code == OK:
1077             return text
1078         raise RuntimeError(text)
1079
1080     def start(self):
1081         msg = controller_messages[START]
1082         self._client.send_msg(msg)
1083         reply = self._client.read_reply()
1084         result = reply.split("|")
1085         code = int(result[0])
1086         text =  base64.b64decode(result[1])
1087         if code == ERROR:
1088             raise RuntimeError(text)
1089
1090     def stop(self):
1091         msg = controller_messages[STOP]
1092         self._client.send_msg(msg)
1093         reply = self._client.read_reply()
1094         result = reply.split("|")
1095         code = int(result[0])
1096         text =  base64.b64decode(result[1])
1097         if code == ERROR:
1098             raise RuntimeError(text)
1099
1100     def recover(self):
1101         msg = controller_messages[RECOVER]
1102         self._client.send_msg(msg)
1103         reply = self._client.read_reply()
1104         result = reply.split("|")
1105         code = int(result[0])
1106         text =  base64.b64decode(result[1])
1107         if code == ERROR:
1108             raise RuntimeError(text)
1109
1110     def is_finished(self, guid):
1111         msg = controller_messages[FINISHED]
1112         msg = msg % guid
1113         self._client.send_msg(msg)
1114         reply = self._client.read_reply()
1115         result = reply.split("|")
1116         code = int(result[0])
1117         text = base64.b64decode(result[1])
1118         if code == ERROR:
1119             raise RuntimeError(text)
1120         return text == "True"
1121
1122     def shutdown(self):
1123         msg = controller_messages[SHUTDOWN]
1124         self._client.send_msg(msg)
1125         reply = self._client.read_reply()
1126         result = reply.split("|")
1127         code = int(result[0])
1128         text =  base64.b64decode(result[1])
1129         if code == ERROR:
1130             raise RuntimeError(text)
1131         self._client.send_stop()
1132         self._client.read_reply() # wait for it
1133