9470ad5e64311592d8dd98c850fd29d32a37a75a
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW
8 import getpass
9 import cPickle
10 import sys
11 import time
12 import tempfile
13 import shutil
14
15 # PROTOCOL REPLIES
16 OK = 0
17 ERROR = 1
18
19 # PROTOCOL INSTRUCTION MESSAGES
20 XML = 2 
21 ACCESS  = 3
22 TRACE   = 4
23 FINISHED    = 5
24 START   = 6
25 STOP    = 7
26 SHUTDOWN    = 8
27 CONFIGURE   = 9
28 CREATE      = 10
29 CREATE_SET  = 11
30 FACTORY_SET = 12
31 CONNECT     = 13
32 CROSS_CONNECT   = 14
33 ADD_TRACE   = 15
34 ADD_ADDRESS = 16
35 ADD_ROUTE   = 17
36 DO_SETUP    = 18
37 DO_CREATE   = 19
38 DO_CONNECT_INIT = 20
39 DO_CONFIGURE    = 21
40 DO_CROSS_CONNECT_INIT   = 22
41 GET = 23
42 SET = 24
43 ACTION  = 25
44 STATUS  = 26
45 GUIDS  = 27
46 GET_ROUTE = 28
47 GET_ADDRESS = 29
48 RECOVER = 30
49 DO_PRECONFIGURE     = 31
50 GET_ATTRIBUTE_LIST  = 32
51 DO_CONNECT_COMPL    = 33
52 DO_CROSS_CONNECT_COMPL  = 34
53 TESTBED_ID  = 35
54 TESTBED_VERSION  = 36
55
56 # PARAMETER TYPE
57 STRING  =  100
58 INTEGER = 101
59 BOOL    = 102
60 FLOAT   = 103
61
62 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
63 controller_messages = dict({
64     XML:    "%d" % XML,
65     ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
66     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
67     FINISHED:   "%d|%s" % (FINISHED, "%d"),
68     START:  "%d" % START,
69     STOP:   "%d" % STOP,
70     RECOVER : "%d" % RECOVER,
71     SHUTDOWN:   "%d" % SHUTDOWN,
72     })
73
74 # TESTBED INSTANCE PROTOCOL MESSAGES
75 testbed_messages = dict({
76     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
77     START:  "%d" % START,
78     STOP:   "%d" % STOP,
79     SHUTDOWN:   "%d" % SHUTDOWN,
80     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
81     CREATE: "%d|%s" % (CREATE, "%d|%s"),
82     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
83     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
84     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
85     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
86     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
87     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
88     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
89     DO_SETUP:   "%d" % DO_SETUP,
90     DO_CREATE:  "%d" % DO_CREATE,
91     DO_CONNECT_INIT:    "%d" % DO_CONNECT_INIT,
92     DO_CONNECT_COMPL:   "%d" % DO_CONNECT_COMPL,
93     DO_CONFIGURE:       "%d" % DO_CONFIGURE,
94     DO_PRECONFIGURE:    "%d" % DO_PRECONFIGURE,
95     DO_CROSS_CONNECT_INIT:  "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
96     DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
97     GET:    "%d|%s" % (GET, "%s|%d|%s"),
98     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
99     GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
100     GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
101     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
102     STATUS: "%d|%s" % (STATUS, "%s"),
103     GUIDS:  "%d" % GUIDS,
104     GET_ATTRIBUTE_LIST:  "%d" % GET_ATTRIBUTE_LIST,
105     TESTBED_ID:  "%d" % TESTBED_ID,
106     TESTBED_VERSION:  "%d" % TESTBED_VERSION,
107    })
108
109 instruction_text = dict({
110     OK:     "OK",
111     ERROR:  "ERROR",
112     XML:    "XML",
113     ACCESS: "ACCESS",
114     TRACE:  "TRACE",
115     FINISHED:   "FINISHED",
116     START:  "START",
117     STOP:   "STOP",
118     RECOVER: "RECOVER",
119     SHUTDOWN:   "SHUTDOWN",
120     CONFIGURE:  "CONFIGURE",
121     CREATE: "CREATE",
122     CREATE_SET: "CREATE_SET",
123     FACTORY_SET:    "FACTORY_SET",
124     CONNECT:    "CONNECT",
125     CROSS_CONNECT: "CROSS_CONNECT",
126     ADD_TRACE:  "ADD_TRACE",
127     ADD_ADDRESS:    "ADD_ADDRESS",
128     ADD_ROUTE:  "ADD_ROUTE",
129     DO_SETUP:   "DO_SETUP",
130     DO_CREATE:  "DO_CREATE",
131     DO_CONNECT_INIT: "DO_CONNECT_INIT",
132     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
133     DO_CONFIGURE:   "DO_CONFIGURE",
134     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
135     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
136     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
137     GET:    "GET",
138     SET:    "SET",
139     GET_ROUTE: "GET_ROUTE",
140     GET_ADDRESS: "GET_ADDRESS",
141     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
142     ACTION: "ACTION",
143     STATUS: "STATUS",
144     GUIDS:  "GUIDS",
145     STRING: "STRING",
146     INTEGER:    "INTEGER",
147     BOOL:   "BOOL",
148     FLOAT:  "FLOAT",
149     TESTBED_ID: "TESTBED_ID",
150     TESTBED_VERSION: "TESTBED_VERSION",
151     })
152
153 def get_type(value):
154     if isinstance(value, bool):
155         return BOOL
156     elif isinstance(value, int):
157         return INTEGER
158     elif isinstance(value, float):
159         return FLOAT
160     else:
161         return STRING
162
163 def set_type(type, value):
164     if type == INTEGER:
165         value = int(value)
166     elif type == FLOAT:
167         value = float(value)
168     elif type == BOOL:
169         value = value == "True"
170     else:
171         value = str(value)
172     return value
173
174 def log_msg(server, params):
175     instr = int(params[0])
176     instr_txt = instruction_text[instr]
177     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
178         instr_txt, ", ".join(map(str, params[1:]))))
179
180 def log_reply(server, reply):
181     res = reply.split("|")
182     code = int(res[0])
183     code_txt = instruction_text[code]
184     txt = base64.b64decode(res[1])
185     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
186             code_txt, txt))
187
188 def to_server_log_level(log_level):
189     return server.DEBUG_LEVEL \
190             if log_level == AccessConfiguration.DEBUG_LEVEL \
191                 else server.ERROR_LEVEL
192
193 def get_access_config_params(access_config):
194     root_dir = access_config.get_attribute_value("rootDirectory")
195     log_level = access_config.get_attribute_value("logLevel")
196     log_level = to_server_log_level(log_level)
197     user = host = port = agent = None
198     communication = access_config.get_attribute_value("communication")
199     if communication == AccessConfiguration.ACCESS_SSH:
200         user = access_config.get_attribute_value("user")
201         host = access_config.get_attribute_value("host")
202         port = access_config.get_attribute_value("port")
203         agent = access_config.get_attribute_value("useAgent")
204     return (root_dir, log_level, user, host, port, agent)
205
206 class AccessConfiguration(AttributesMap):
207     MODE_SINGLE_PROCESS = "SINGLE"
208     MODE_DAEMON = "DAEMON"
209     ACCESS_SSH = "SSH"
210     ACCESS_LOCAL = "LOCAL"
211     ERROR_LEVEL = "Error"
212     DEBUG_LEVEL = "Debug"
213
214     def __init__(self):
215         super(AccessConfiguration, self).__init__()
216         self.add_attribute(name = "mode",
217                 help = "Instance execution mode",
218                 type = Attribute.ENUM,
219                 value = AccessConfiguration.MODE_SINGLE_PROCESS,
220                 allowed = [AccessConfiguration.MODE_DAEMON,
221                     AccessConfiguration.MODE_SINGLE_PROCESS],
222                 validation_function = validation.is_enum)
223         self.add_attribute(name = "communication",
224                 help = "Instance communication mode",
225                 type = Attribute.ENUM,
226                 value = AccessConfiguration.ACCESS_LOCAL,
227                 allowed = [AccessConfiguration.ACCESS_LOCAL,
228                     AccessConfiguration.ACCESS_SSH],
229                 validation_function = validation.is_enum)
230         self.add_attribute(name = "host",
231                 help = "Host where the testbed will be executed",
232                 type = Attribute.STRING,
233                 value = "localhost",
234                 validation_function = validation.is_string)
235         self.add_attribute(name = "user",
236                 help = "User on the Host to execute the testbed",
237                 type = Attribute.STRING,
238                 value = getpass.getuser(),
239                 validation_function = validation.is_string)
240         self.add_attribute(name = "port",
241                 help = "Port on the Host",
242                 type = Attribute.INTEGER,
243                 value = 22,
244                 validation_function = validation.is_integer)
245         self.add_attribute(name = "rootDirectory",
246                 help = "Root directory for storing process files",
247                 type = Attribute.STRING,
248                 value = ".",
249                 validation_function = validation.is_string) # TODO: validation.is_path
250         self.add_attribute(name = "useAgent",
251                 help = "Use -A option for forwarding of the authentication agent, if ssh access is used", 
252                 type = Attribute.BOOL,
253                 value = False,
254                 validation_function = validation.is_bool)
255         self.add_attribute(name = "logLevel",
256                 help = "Log level for instance",
257                 type = Attribute.ENUM,
258                 value = AccessConfiguration.ERROR_LEVEL,
259                 allowed = [AccessConfiguration.ERROR_LEVEL,
260                     AccessConfiguration.DEBUG_LEVEL],
261                 validation_function = validation.is_enum)
262         self.add_attribute(name = "recover",
263                 help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
264                 type = Attribute.BOOL,
265                 value = False,
266                 validation_function = validation.is_bool)
267
268 class TempDir(object):
269     def __init__(self):
270         self.path = tempfile.mkdtemp()
271     
272     def __del__(self):
273         shutil.rmtree(self.path)
274
275 class PermDir(object):
276     def __init__(self, path):
277         self.path = path
278
279 def create_controller(xml, access_config = None):
280     mode = None if not access_config \
281             else access_config.get_attribute_value("mode")
282     launch = True if not access_config \
283             else not access_config.get_attribute_value("recover")
284     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
285         if not launch:
286             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
287         
288         from nepi.core.execute import ExperimentController
289         
290         if not access_config or not access_config.has_attribute("rootDirectory"):
291             root_dir = TempDir()
292         else:
293             root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
294         controller = ExperimentController(xml, root_dir.path)
295         
296         # inject reference to temporary dir, so that it gets cleaned
297         # up at destruction time.
298         controller._tempdir = root_dir
299         
300         return controller
301     elif mode == AccessConfiguration.MODE_DAEMON:
302         (root_dir, log_level, user, host, port, agent) = \
303                 get_access_config_params(access_config)
304         return ExperimentControllerProxy(root_dir, log_level,
305                 experiment_xml = xml, host = host, port = port, user = user, 
306                 agent = agent, launch = launch)
307     raise RuntimeError("Unsupported access configuration '%s'" % mode)
308
309 def create_testbed_controller(testbed_id, testbed_version, access_config):
310     mode = None if not access_config \
311             else access_config.get_attribute_value("mode")
312     launch = True if not access_config \
313             else not access_config.get_attribute_value("recover")
314     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
315         if not launch:
316             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
317         return  _build_testbed_controller(testbed_id, testbed_version)
318     elif mode == AccessConfiguration.MODE_DAEMON:
319         (root_dir, log_level, user, host, port, agent) = \
320                 get_access_config_params(access_config)
321         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
322                 testbed_version = testbed_version, host = host, port = port,
323                 user = user, agent = agent, launch = launch)
324     raise RuntimeError("Unsupported access configuration '%s'" % mode)
325
326 def _build_testbed_controller(testbed_id, testbed_version):
327     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
328     if not mod_name in sys.modules:
329         __import__(mod_name)
330     module = sys.modules[mod_name]
331     return module.TestbedController(testbed_version)
332
333 class TestbedControllerServer(server.Server):
334     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
335         super(TestbedControllerServer, self).__init__(root_dir, log_level)
336         self._testbed_id = testbed_id
337         self._testbed_version = testbed_version
338         self._testbed = None
339
340     def post_daemonize(self):
341         self._testbed = _build_testbed_controller(self._testbed_id, 
342                 self._testbed_version)
343
344     def reply_action(self, msg):
345         if not msg:
346             result = base64.b64encode("Invalid command line")
347             reply = "%d|%s" % (ERROR, result)
348         else:
349             params = msg.split("|")
350             instruction = int(params[0])
351             log_msg(self, params)
352             try:
353                 if instruction == TRACE:
354                     reply = self.trace(params)
355                 elif instruction == START:
356                     reply = self.start(params)
357                 elif instruction == STOP:
358                     reply = self.stop(params)
359                 elif instruction == SHUTDOWN:
360                     reply = self.shutdown(params)
361                 elif instruction == CONFIGURE:
362                     reply = self.defer_configure(params)
363                 elif instruction == CREATE:
364                     reply = self.defer_create(params)
365                 elif instruction == CREATE_SET:
366                     reply = self.defer_create_set(params)
367                 elif instruction == FACTORY_SET:
368                     reply = self.defer_factory_set(params)
369                 elif instruction == CONNECT:
370                     reply = self.defer_connect(params)
371                 elif instruction == CROSS_CONNECT:
372                     reply = self.defer_cross_connect(params)
373                 elif instruction == ADD_TRACE:
374                     reply = self.defer_add_trace(params)
375                 elif instruction == ADD_ADDRESS:
376                     reply = self.defer_add_address(params)
377                 elif instruction == ADD_ROUTE:
378                     reply = self.defer_add_route(params)
379                 elif instruction == DO_SETUP:
380                     reply = self.do_setup(params)
381                 elif instruction == DO_CREATE:
382                     reply = self.do_create(params)
383                 elif instruction == DO_CONNECT_INIT:
384                     reply = self.do_connect_init(params)
385                 elif instruction == DO_CONNECT_COMPL:
386                     reply = self.do_connect_compl(params)
387                 elif instruction == DO_CONFIGURE:
388                     reply = self.do_configure(params)
389                 elif instruction == DO_PRECONFIGURE:
390                     reply = self.do_preconfigure(params)
391                 elif instruction == DO_CROSS_CONNECT_INIT:
392                     reply = self.do_cross_connect_init(params)
393                 elif instruction == DO_CROSS_CONNECT_COMPL:
394                     reply = self.do_cross_connect_compl(params)
395                 elif instruction == GET:
396                     reply = self.get(params)
397                 elif instruction == SET:
398                     reply = self.set(params)
399                 elif instruction == GET_ADDRESS:
400                     reply = self.get_address(params)
401                 elif instruction == GET_ROUTE:
402                     reply = self.get_route(params)
403                 elif instruction == ACTION:
404                     reply = self.action(params)
405                 elif instruction == STATUS:
406                     reply = self.status(params)
407                 elif instruction == GUIDS:
408                     reply = self.guids(params)
409                 elif instruction == GET_ATTRIBUTE_LIST:
410                     reply = self.get_attribute_list(params)
411                 elif instruction == TESTBED_ID:
412                     reply = self.testbed_id(params)
413                 elif instruction == TESTBED_VERSION:
414                     reply = self.testbed_version(params)
415                 else:
416                     error = "Invalid instruction %s" % instruction
417                     self.log_error(error)
418                     result = base64.b64encode(error)
419                     reply = "%d|%s" % (ERROR, result)
420             except:
421                 error = self.log_error()
422                 result = base64.b64encode(error)
423                 reply = "%d|%s" % (ERROR, result)
424         log_reply(self, reply)
425         return reply
426
427     def guids(self, params):
428         guids = self._testbed.guids
429         value = cPickle.dumps(guids)
430         result = base64.b64encode(value)
431         return "%d|%s" % (OK, result)
432
433     def testbed_id(self, params):
434         testbed_id = self._testbed.testbed_id
435         result = base64.b64encode(str(testbed_id))
436         return "%d|%s" % (OK, result)
437
438     def testbed_version(self, params):
439         testbed_version = self._testbed.testbed_version
440         result = base64.b64encode(str(testbed_version))
441         return "%d|%s" % (OK, result)
442
443     def defer_create(self, params):
444         guid = int(params[1])
445         factory_id = params[2]
446         self._testbed.defer_create(guid, factory_id)
447         return "%d|%s" % (OK, "")
448
449     def trace(self, params):
450         guid = int(params[1])
451         trace_id = params[2]
452         attribute = base64.b64decode(params[3])
453         trace = self._testbed.trace(guid, trace_id, attribute)
454         result = base64.b64encode(trace)
455         return "%d|%s" % (OK, result)
456
457     def start(self, params):
458         self._testbed.start()
459         return "%d|%s" % (OK, "")
460
461     def stop(self, params):
462         self._testbed.stop()
463         return "%d|%s" % (OK, "")
464
465     def shutdown(self, params):
466         self._testbed.shutdown()
467         return "%d|%s" % (OK, "")
468
469     def defer_configure(self, params):
470         name = base64.b64decode(params[1])
471         value = base64.b64decode(params[2])
472         type = int(params[3])
473         value = set_type(type, value)
474         self._testbed.defer_configure(name, value)
475         return "%d|%s" % (OK, "")
476
477     def defer_create_set(self, params):
478         guid = int(params[1])
479         name = base64.b64decode(params[2])
480         value = base64.b64decode(params[3])
481         type = int(params[4])
482         value = set_type(type, value)
483         self._testbed.defer_create_set(guid, name, value)
484         return "%d|%s" % (OK, "")
485
486     def defer_factory_set(self, params):
487         name = base64.b64decode(params[1])
488         value = base64.b64decode(params[2])
489         type = int(params[3])
490         value = set_type(type, value)
491         self._testbed.defer_factory_set(name, value)
492         return "%d|%s" % (OK, "")
493
494     def defer_connect(self, params):
495         guid1 = int(params[1])
496         connector_type_name1 = params[2]
497         guid2 = int(params[3])
498         connector_type_name2 = params[4]
499         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
500             connector_type_name2)
501         return "%d|%s" % (OK, "")
502
503     def defer_cross_connect(self, params):
504         guid = int(params[1])
505         connector_type_name = params[2]
506         cross_guid = int(params[3])
507         connector_type_name = params[4]
508         cross_guid = int(params[5])
509         cross_testbed_id = params[6]
510         cross_factory_id = params[7]
511         cross_connector_type_name = params[8]
512         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
513             cross_testbed_id, cross_factory_id, cross_connector_type_name)
514         return "%d|%s" % (OK, "")
515
516     def defer_add_trace(self, params):
517         guid = int(params[1])
518         trace_id = params[2]
519         self._testbed.defer_add_trace(guid, trace_id)
520         return "%d|%s" % (OK, "")
521
522     def defer_add_address(self, params):
523         guid = int(params[1])
524         address = params[2]
525         netprefix = int(params[3])
526         broadcast = params[4]
527         self._testbed.defer_add_address(guid, address, netprefix,
528                 broadcast)
529         return "%d|%s" % (OK, "")
530
531     def defer_add_route(self, params):
532         guid = int(params[1])
533         destination = params[2]
534         netprefix = int(params[3])
535         nexthop = params[4]
536         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
537         return "%d|%s" % (OK, "")
538
539     def do_setup(self, params):
540         self._testbed.do_setup()
541         return "%d|%s" % (OK, "")
542
543     def do_create(self, params):
544         self._testbed.do_create()
545         return "%d|%s" % (OK, "")
546
547     def do_connect_init(self, params):
548         self._testbed.do_connect_init()
549         return "%d|%s" % (OK, "")
550
551     def do_connect_compl(self, params):
552         self._testbed.do_connect_compl()
553         return "%d|%s" % (OK, "")
554
555     def do_configure(self, params):
556         self._testbed.do_configure()
557         return "%d|%s" % (OK, "")
558
559     def do_preconfigure(self, params):
560         self._testbed.do_preconfigure()
561         return "%d|%s" % (OK, "")
562
563     def do_cross_connect_init(self, params):
564         pcross_data = base64.b64decode(params[1])
565         cross_data = cPickle.loads(pcross_data)
566         self._testbed.do_cross_connect_init(cross_data)
567         return "%d|%s" % (OK, "")
568
569     def do_cross_connect_compl(self, params):
570         pcross_data = base64.b64decode(params[1])
571         cross_data = cPickle.loads(pcross_data)
572         self._testbed.do_cross_connect_compl(cross_data)
573         return "%d|%s" % (OK, "")
574
575     def get(self, params):
576         time = params[1]
577         guid = int(param[2])
578         name = base64.b64decode(params[3])
579         value = self._testbed.get(time, guid, name)
580         result = base64.b64encode(str(value))
581         return "%d|%s" % (OK, result)
582
583     def set(self, params):
584         time = params[1]
585         guid = int(params[2])
586         name = base64.b64decode(params[3])
587         value = base64.b64decode(params[4])
588         type = int(params[3])
589         value = set_type(type, value)
590         self._testbed.set(time, guid, name, value)
591         return "%d|%s" % (OK, "")
592
593     def get_address(self, params):
594         guid = int(params[1])
595         index = int(params[2])
596         attribute = base64.b64decode(param[3])
597         value = self._testbed.get_address(guid, index, attribute)
598         result = base64.b64encode(str(value))
599         return "%d|%s" % (OK, result)
600
601     def get_route(self, params):
602         guid = int(params[1])
603         index = int(params[2])
604         attribute = base64.b64decode(param[3])
605         value = self._testbed.get_route(guid, index, attribute)
606         result = base64.b64encode(str(value))
607         return "%d|%s" % (OK, result)
608
609     def action(self, params):
610         time = params[1]
611         guid = int(params[2])
612         command = base64.b64decode(params[3])
613         self._testbed.action(time, guid, command)
614         return "%d|%s" % (OK, "")
615
616     def status(self, params):
617         guid = None
618         if params[1] != "None":
619             guid = int(params[1])
620         status = self._testbed.status(guid)
621         result = base64.b64encode(str(status))
622         return "%d|%s" % (OK, result)
623
624     def get_attribute_list(self, params):
625         guid = int(params[1])
626         attr_list = self._testbed.get_attribute_list(guid)
627         value = cPickle.dumps(attr_list)
628         result = base64.b64encode(value)
629         return "%d|%s" % (OK, result)
630
631 class ExperimentControllerServer(server.Server):
632     def __init__(self, root_dir, log_level, experiment_xml):
633         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
634         self._experiment_xml = experiment_xml
635         self._controller = None
636
637     def post_daemonize(self):
638         from nepi.core.execute import ExperimentController
639         self._controller = ExperimentController(self._experiment_xml, 
640             root_dir = self._root_dir)
641
642     def reply_action(self, msg):
643         if not msg:
644             result = base64.b64encode("Invalid command line")
645             reply = "%d|%s" % (ERROR, result)
646         else:
647             params = msg.split("|")
648             instruction = int(params[0])
649             log_msg(self, params)
650             try:
651                 if instruction == XML:
652                     reply = self.experiment_xml(params)
653                 elif instruction == ACCESS:
654                     reply = self.set_access_configuration(params)
655                 elif instruction == TRACE:
656                     reply = self.trace(params)
657                 elif instruction == FINISHED:
658                     reply = self.is_finished(params)
659                 elif instruction == START:
660                     reply = self.start(params)
661                 elif instruction == STOP:
662                     reply = self.stop(params)
663                 elif instruction == RECOVER:
664                     reply = self.recover(params)
665                 elif instruction == SHUTDOWN:
666                     reply = self.shutdown(params)
667                 else:
668                     error = "Invalid instruction %s" % instruction
669                     self.log_error(error)
670                     result = base64.b64encode(error)
671                     reply = "%d|%s" % (ERROR, result)
672             except:
673                 error = self.log_error()
674                 result = base64.b64encode(error)
675                 reply = "%d|%s" % (ERROR, result)
676         log_reply(self, reply)
677         return reply
678
679     def experiment_xml(self, params):
680         xml = self._controller.experiment_xml
681         result = base64.b64encode(xml)
682         return "%d|%s" % (OK, result)
683
684     def set_access_configuration(self, params):
685         testbed_guid = int(params[1])
686         mode = params[2]
687         communication = params[3]
688         host = params[4]
689         user = params[5]
690         port = int(params[6])
691         root_dir = params[7]
692         use_agent = params[8] == "True"
693         log_level = params[9]
694         access_config = AccessConfiguration()
695         access_config.set_attribute_value("mode", mode)
696         access_config.set_attribute_value("communication", communication)
697         access_config.set_attribute_value("host", host)
698         access_config.set_attribute_value("user", user)
699         access_config.set_attribute_value("port", port)
700         access_config.set_attribute_value("rootDirectory", root_dir)
701         access_config.set_attribute_value("useAgent", use_agent)
702         access_config.set_attribute_value("logLevel", log_level)
703         self._controller.set_access_configuration(testbed_guid, 
704                 access_config)
705         return "%d|%s" % (OK, "")
706
707     def trace(self, params):
708         testbed_guid = int(params[1])
709         guid = int(params[2])
710         trace_id = params[3]
711         attribute = base64.b64decode(params[4])
712         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
713         result = base64.b64encode(trace)
714         return "%d|%s" % (OK, result)
715
716     def is_finished(self, params):
717         guid = int(params[1])
718         status = self._controller.is_finished(guid)
719         result = base64.b64encode(str(status))
720         return "%d|%s" % (OK, result)
721
722     def start(self, params):
723         self._controller.start()
724         return "%d|%s" % (OK, "")
725
726     def stop(self, params):
727         self._controller.stop()
728         return "%d|%s" % (OK, "")
729
730     def recover(self, params):
731         self._controller.recover()
732         return "%d|%s" % (OK, "")
733
734     def shutdown(self, params):
735         self._controller.shutdown()
736         return "%d|%s" % (OK, "")
737
738 class TestbedControllerProxy(object):
739     def __init__(self, root_dir, log_level, testbed_id = None, 
740             testbed_version = None, launch = True, host = None, 
741             port = None, user = None, agent = None):
742         if launch:
743             if testbed_id == None or testbed_version == None:
744                 raise RuntimeError("To launch a TesbedInstance server a \
745                         testbed_id and testbed_version are required")
746             # ssh
747             if host != None:
748                 python_code = "from nepi.util.proxy import \
749                         TesbedInstanceServer;\
750                         s = TestbedControllerServer('%s', %d, '%s', '%s');\
751                         s.run()" % (root_dir, log_level, testbed_id, 
752                                 testbed_version)
753                 proc = server.popen_ssh_subprocess(python_code, host = host,
754                     port = port, user = user, agent = agent)
755                 if proc.poll():
756                     err = proc.stderr.read()
757                     raise RuntimeError("Server could not be executed: %s" % \
758                             err)
759             else:
760                 # launch daemon
761                 s = TestbedControllerServer(root_dir, log_level, testbed_id, 
762                     testbed_version)
763                 s.run()
764
765         # connect client to server
766         self._client = server.Client(root_dir, host = host, port = port, 
767                 user = user, agent = agent)
768
769     @property
770     def guids(self):
771         msg = testbed_messages[GUIDS]
772         self._client.send_msg(msg)
773         reply = self._client.read_reply()
774         result = reply.split("|")
775         code = int(result[0])
776         text = base64.b64decode(result[1])
777         if code == ERROR:
778             raise RuntimeError(text)
779         guids = cPickle.loads(text)
780         return guids
781
782     @property
783     def testbed_id(self):
784         msg = testbed_messages[TESTBED_ID]
785         self._client.send_msg(msg)
786         reply = self._client.read_reply()
787         result = reply.split("|")
788         code = int(result[0])
789         text = base64.b64decode(result[1])
790         if code == ERROR:
791             raise RuntimeError(text)
792         return int(text)
793
794     @property
795     def testbed_version(self):
796         msg = testbed_messages[TESTBED_VERSION]
797         self._client.send_msg(msg)
798         reply = self._client.read_reply()
799         result = reply.split("|")
800         code = int(result[0])
801         text = base64.b64decode(result[1])
802         if code == ERROR:
803             raise RuntimeError(text)
804         return int(text)
805
806     def defer_configure(self, name, value):
807         msg = testbed_messages[CONFIGURE]
808         type = get_type(value)
809         # avoid having "|" in this parameters
810         name = base64.b64encode(name)
811         value = base64.b64encode(str(value))
812         msg = msg % (name, value, type)
813         self._client.send_msg(msg)
814         reply = self._client.read_reply()
815         result = reply.split("|")
816         code = int(result[0])
817         text = base64.b64decode(result[1])
818         if code == ERROR:
819             raise RuntimeError(text)
820
821     def defer_create(self, guid, factory_id):
822         msg = testbed_messages[CREATE]
823         msg = msg % (guid, factory_id)
824         self._client.send_msg(msg)
825         reply = self._client.read_reply()
826         result = reply.split("|")
827         code = int(result[0])
828         text = base64.b64decode(result[1])
829         if code == ERROR:
830             raise RuntimeError(text)
831
832     def defer_create_set(self, guid, name, value):
833         msg = testbed_messages[CREATE_SET]
834         type = get_type(value)
835         # avoid having "|" in this parameters
836         name = base64.b64encode(name)
837         value = base64.b64encode(str(value))
838         msg = msg % (guid, name, value, type)
839         self._client.send_msg(msg)
840         reply = self._client.read_reply()
841         result = reply.split("|")
842         code = int(result[0])
843         text = base64.b64decode(result[1])
844         if code == ERROR:
845             raise RuntimeError(text)
846
847     def defer_factory_set(self, guid, name, value):
848         msg = testbed_messages[FACTORY_SET]
849         type = get_type(value)
850         # avoid having "|" in this parameters
851         name = base64.b64encode(name)
852         value = base64.b64encode(str(value))
853         msg = msg % (guid, name, value, type)
854         self._client.send_msg(msg)
855         reply = self._client.read_reply()
856         result = reply.split("|")
857         code = int(result[0])
858         text = base64.b64decode(result[1])
859         if code == ERROR:
860             raise RuntimeError(text)
861
862     def defer_connect(self, guid1, connector_type_name1, guid2, 
863             connector_type_name2): 
864         msg = testbed_messages[CONNECT]
865         msg = msg % (guid1, connector_type_name1, guid2, 
866             connector_type_name2)
867         self._client.send_msg(msg)
868         reply = self._client.read_reply()
869         result = reply.split("|")
870         code = int(result[0])
871         text = base64.b64decode(result[1])
872         if code == ERROR:
873             raise RuntimeError(text)
874
875     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
876             cross_testbed_id, cross_factory_id, cross_connector_type_name):
877         msg = testbed_messages[CROSS_CONNECT]
878         msg = msg % (guid, connector_type_name, cross_guid, 
879             cross_testbed_id, cross_factory_id, cross_connector_type_name)
880         self._client.send_msg(msg)
881         reply = self._client.read_reply()
882         result = reply.split("|")
883         code = int(result[0])
884         text = base64.b64decode(result[1])
885         if code == ERROR:
886             raise RuntimeError(text)
887
888     def defer_add_trace(self, guid, trace_id):
889         msg = testbed_messages[ADD_TRACE]
890         msg = msg % (guid, trace_id)
891         self._client.send_msg(msg)
892         reply = self._client.read_reply()
893         result = reply.split("|")
894         code = int(result[0])
895         text = base64.b64decode(result[1])
896         if code == ERROR:
897             raise RuntimeError(text)
898
899     def defer_add_address(self, guid, address, netprefix, broadcast): 
900         msg = testbed_messages[ADD_ADDRESS]
901         msg = msg % (guid, address, netprefix, broadcast)
902         self._client.send_msg(msg)
903         reply = self._client.read_reply()
904         result = reply.split("|")
905         code = int(result[0])
906         text = base64.b64decode(result[1])
907         if code == ERROR:
908             raise RuntimeError(text)
909
910     def defer_add_route(self, guid, destination, netprefix, nexthop):
911         msg = testbed_messages[ADD_ROUTE]
912         msg = msg % (guid, destination, netprefix, nexthop)
913         self._client.send_msg(msg)
914         reply = self._client.read_reply()
915         result = reply.split("|")
916         code = int(result[0])
917         text = base64.b64decode(result[1])
918         if code == ERROR:
919             raise RuntimeError(text)
920
921     def do_setup(self):
922         msg = testbed_messages[DO_SETUP]
923         self._client.send_msg(msg)
924         reply = self._client.read_reply()
925         result = reply.split("|")
926         code = int(result[0])
927         text = base64.b64decode(result[1])
928         if code == ERROR:
929             raise RuntimeError(text)
930
931     def do_create(self):
932         msg = testbed_messages[DO_CREATE]
933         self._client.send_msg(msg)
934         reply = self._client.read_reply()
935         result = reply.split("|")
936         code = int(result[0])
937         text = base64.b64decode(result[1])
938         if code == ERROR:
939             raise RuntimeError(text)
940
941     def do_connect_init(self):
942         msg = testbed_messages[DO_CONNECT_INIT]
943         self._client.send_msg(msg)
944         reply = self._client.read_reply()
945         result = reply.split("|")
946         code = int(result[0])
947         text = base64.b64decode(result[1])
948         if code == ERROR:
949             raise RuntimeError(text)
950
951     def do_connect_compl(self):
952         msg = testbed_messages[DO_CONNECT_COMPL]
953         self._client.send_msg(msg)
954         reply = self._client.read_reply()
955         result = reply.split("|")
956         code = int(result[0])
957         text = base64.b64decode(result[1])
958         if code == ERROR:
959             raise RuntimeError(text)
960
961     def do_configure(self):
962         msg = testbed_messages[DO_CONFIGURE]
963         self._client.send_msg(msg)
964         reply = self._client.read_reply()
965         result = reply.split("|")
966         code = int(result[0])
967         text = base64.b64decode(result[1])
968         if code == ERROR:
969             raise RuntimeError(text)
970
971     def do_preconfigure(self):
972         msg = testbed_messages[DO_PRECONFIGURE]
973         self._client.send_msg(msg)
974         reply = self._client.read_reply()
975         result = reply.split("|")
976         code = int(result[0])
977         text = base64.b64decode(result[1])
978         if code == ERROR:
979             raise RuntimeError(text)
980
981     def do_cross_connect_init(self, cross_data):
982         msg = testbed_messages[DO_CROSS_CONNECT_INIT]
983         pcross_data = cPickle.dumps(cross_data)
984         cross_data = base64.b64encode(pcross_data)
985         msg = msg % (cross_data)
986         self._client.send_msg(msg)
987         reply = self._client.read_reply()
988         result = reply.split("|")
989         code = int(result[0])
990         text = base64.b64decode(result[1])
991         if code == ERROR:
992             raise RuntimeError(text)
993
994     def do_cross_connect_compl(self, cross_data):
995         msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
996         pcross_data = cPickle.dumps(cross_data)
997         cross_data = base64.b64encode(pcross_data)
998         msg = msg % (cross_data)
999         self._client.send_msg(msg)
1000         reply = self._client.read_reply()
1001         result = reply.split("|")
1002         code = int(result[0])
1003         text = base64.b64decode(result[1])
1004         if code == ERROR:
1005             raise RuntimeError(text)
1006
1007     def start(self, time = TIME_NOW):
1008         msg = testbed_messages[START]
1009         self._client.send_msg(msg)
1010         reply = self._client.read_reply()
1011         result = reply.split("|")
1012         code = int(result[0])
1013         text = base64.b64decode(result[1])
1014         if code == ERROR:
1015             raise RuntimeError(text)
1016
1017     def stop(self, time = TIME_NOW):
1018         msg = testbed_messages[STOP]
1019         self._client.send_msg(msg)
1020         reply = self._client.read_reply()
1021         result = reply.split("|")
1022         code = int(result[0])
1023         text = base64.b64decode(result[1])
1024         if code == ERROR:
1025             raise RuntimeError(text)
1026
1027     def set(self, time, guid, name, value):
1028         msg = testbed_messages[SET]
1029         type = get_type(value)
1030         # avoid having "|" in this parameters
1031         name = base64.b64encode(name)
1032         value = base64.b64encode(str(value))
1033         msg = msg % (time, guid, name, value, type)
1034         self._client.send_msg(msg)
1035         reply = self._client.read_reply()
1036         result = reply.split("|")
1037         code = int(result[0])
1038         text = base64.b64decode(result[1])
1039         if code == ERROR:
1040             raise RuntimeError(text)
1041
1042     def get(self, time, guid, name):
1043         msg = testbed_messages[GET]
1044         # avoid having "|" in this parameters
1045         name = base64.b64encode(name)
1046         msg = msg % (time, guid, name)
1047         self._client.send_msg(msg)
1048         reply = self._client.read_reply()
1049         result = reply.split("|")
1050         code = int(result[0])
1051         text = base64.b64decode(result[1])
1052         if code == ERROR:
1053             raise RuntimeError(text)
1054         return text
1055
1056     def get_address(self, guid, index, attribute):
1057         msg = testbed_messages[GET_ADDRESS]
1058         # avoid having "|" in this parameters
1059         attribute = base64.b64encode(attribute)
1060         msg = msg % (guid, index, attribute)
1061         self._client.send_msg(msg)
1062         reply = self._client.read_reply()
1063         result = reply.split("|")
1064         code = int(result[0])
1065         text = base64.b64decode(result[1])
1066         if code == ERROR:
1067             raise RuntimeError(text)
1068         return text
1069
1070     def get_route(self, guid, index, attribute):
1071         msg = testbed_messages[GET_ROUTE]
1072         # avoid having "|" in this parameters
1073         attribute = base64.b64encode(attribute)
1074         msg = msg % (guid, index, attribute)
1075         self._client.send_msg(msg)
1076         reply = self._client.read_reply()
1077         result = reply.split("|")
1078         code = int(result[0])
1079         text = base64.b64decode(result[1])
1080         if code == ERROR:
1081             raise RuntimeError(text)
1082         return text
1083
1084     def action(self, time, guid, action):
1085         msg = testbed_messages[ACTION]
1086         msg = msg % (time, guid, action)
1087         self._client.send_msg(msg)
1088         reply = self._client.read_reply()
1089         result = reply.split("|")
1090         code = int(result[0])
1091         text = base64.b64decode(result[1])
1092         if code == ERROR:
1093             raise RuntimeError(text)
1094
1095     def status(self, guid = None):
1096         msg = testbed_messages[STATUS]
1097         msg = msg % str(guid)
1098         self._client.send_msg(msg)
1099         reply = self._client.read_reply()
1100         result = reply.split("|")
1101         code = int(result[0])
1102         text = base64.b64decode(result[1])
1103         if code == ERROR:
1104             raise RuntimeError(text)
1105         return int(text)
1106
1107     def trace(self, guid, trace_id, attribute='value'):
1108         msg = testbed_messages[TRACE]
1109         attribute = base64.b64encode(attribute)
1110         msg = msg % (guid, trace_id, attribute)
1111         self._client.send_msg(msg)
1112         reply = self._client.read_reply()
1113         result = reply.split("|")
1114         code = int(result[0])
1115         text = base64.b64decode(result[1])
1116         if code == ERROR:
1117             raise RuntimeError(text)
1118         return text
1119
1120     def get_attribute_list(self, guid):
1121         msg = testbed_messages[GET_ATTRIBUTE_LIST]
1122         msg = msg % (guid)
1123         self._client.send_msg(msg)
1124         reply = self._client.read_reply()
1125         result = reply.split("|")
1126         code = int(result[0])
1127         text = base64.b64decode(result[1])
1128         if code == ERROR:
1129             raise RuntimeError(text)
1130         attr_list = cPickle.loads(text)
1131         return attr_list
1132
1133     def shutdown(self):
1134         msg = testbed_messages[SHUTDOWN]
1135         self._client.send_msg(msg)
1136         reply = self._client.read_reply()
1137         result = reply.split("|")
1138         code = int(result[0])
1139         text = base64.b64decode(result[1])
1140         if code == ERROR:
1141             raise RuntimeError(text)
1142         self._client.send_stop()
1143         self._client.read_reply() # wait for it
1144
1145 class ExperimentControllerProxy(object):
1146     def __init__(self, root_dir, log_level, experiment_xml = None, 
1147             launch = True, host = None, port = None, user = None, 
1148             agent = None):
1149         if launch:
1150             # launch server
1151             if experiment_xml == None:
1152                 raise RuntimeError("To launch a ExperimentControllerServer a \
1153                         xml description of the experiment is required")
1154             # ssh
1155             if host != None:
1156                 xml = experiment_xml
1157                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1158                         s = ExperimentControllerServer(%r, %r, %r);\
1159                         s.run()" % (root_dir, log_level, xml)
1160                 proc = server.popen_ssh_subprocess(python_code, host = host,
1161                     port = port, user = user, agent = agent)
1162                 if proc.poll():
1163                     err = proc.stderr.read()
1164                     raise RuntimeError("Server could not be executed: %s" % \
1165                             err)
1166             else:
1167                 # launch daemon
1168                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1169                 s.run()
1170
1171         # connect client to server
1172         self._client = server.Client(root_dir, host = host, port = port, 
1173                 user = user, agent = agent)
1174
1175     @property
1176     def experiment_xml(self):
1177         msg = controller_messages[XML]
1178         self._client.send_msg(msg)
1179         reply = self._client.read_reply()
1180         result = reply.split("|")
1181         code = int(result[0])
1182         text = base64.b64decode(result[1])
1183         if code == ERROR:
1184             raise RuntimeError(text)
1185         return text
1186
1187     def set_access_configuration(self, testbed_guid, access_config):
1188         mode = access_config.get_attribute_value("mode")
1189         communication = access_config.get_attribute_value("communication")
1190         host = access_config.get_attribute_value("host")
1191         user = access_config.get_attribute_value("user")
1192         port = access_config.get_attribute_value("port")
1193         root_dir = access_config.get_attribute_value("rootDirectory")
1194         use_agent = access_config.get_attribute_value("useAgent")
1195         log_level = access_config.get_attribute_value("logLevel")
1196         msg = controller_messages[ACCESS]
1197         msg = msg % (testbed_guid, mode, communication, host, user, port, 
1198                 root_dir, use_agent, log_level)
1199         self._client.send_msg(msg)
1200         reply = self._client.read_reply()
1201         result = reply.split("|")
1202         code = int(result[0])
1203         text =  base64.b64decode(result[1])
1204         if code == ERROR:
1205             raise RuntimeError(text)
1206
1207     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1208         msg = controller_messages[TRACE]
1209         attribute = base64.b64encode(attribute)
1210         msg = msg % (testbed_guid, guid, trace_id, attribute)
1211         self._client.send_msg(msg)
1212         reply = self._client.read_reply()
1213         result = reply.split("|")
1214         code = int(result[0])
1215         text =  base64.b64decode(result[1])
1216         if code == OK:
1217             return text
1218         raise RuntimeError(text)
1219
1220     def start(self):
1221         msg = controller_messages[START]
1222         self._client.send_msg(msg)
1223         reply = self._client.read_reply()
1224         result = reply.split("|")
1225         code = int(result[0])
1226         text =  base64.b64decode(result[1])
1227         if code == ERROR:
1228             raise RuntimeError(text)
1229
1230     def stop(self):
1231         msg = controller_messages[STOP]
1232         self._client.send_msg(msg)
1233         reply = self._client.read_reply()
1234         result = reply.split("|")
1235         code = int(result[0])
1236         text =  base64.b64decode(result[1])
1237         if code == ERROR:
1238             raise RuntimeError(text)
1239
1240     def recover(self):
1241         msg = controller_messages[RECOVER]
1242         self._client.send_msg(msg)
1243         reply = self._client.read_reply()
1244         result = reply.split("|")
1245         code = int(result[0])
1246         text =  base64.b64decode(result[1])
1247         if code == ERROR:
1248             raise RuntimeError(text)
1249
1250     def is_finished(self, guid):
1251         msg = controller_messages[FINISHED]
1252         msg = msg % guid
1253         self._client.send_msg(msg)
1254         reply = self._client.read_reply()
1255         result = reply.split("|")
1256         code = int(result[0])
1257         text = base64.b64decode(result[1])
1258         if code == ERROR:
1259             raise RuntimeError(text)
1260         return text == "True"
1261
1262     def shutdown(self):
1263         msg = controller_messages[SHUTDOWN]
1264         self._client.send_msg(msg)
1265         reply = self._client.read_reply()
1266         result = reply.split("|")
1267         code = int(result[0])
1268         text =  base64.b64decode(result[1])
1269         if code == ERROR:
1270             raise RuntimeError(text)
1271         self._client.send_stop()
1272         self._client.read_reply() # wait for it
1273