Fixed several typos
[nepi.git] / src / nepi / util / proxy.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import base64
5 from nepi.core.attributes import AttributesMap, Attribute
6 from nepi.util import server, validation
7 from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
8 import getpass
9 import cPickle
10 import sys
11 import time
12 import tempfile
13 import shutil
14
15 # PROTOCOL REPLIES
16 OK = 0
17 ERROR = 1
18
19 # PROTOCOL INSTRUCTION MESSAGES
20 XML = 2 
21 TRACE   = 4
22 FINISHED    = 5
23 START   = 6
24 STOP    = 7
25 SHUTDOWN    = 8
26 CONFIGURE   = 9
27 CREATE      = 10
28 CREATE_SET  = 11
29 FACTORY_SET = 12
30 CONNECT     = 13
31 CROSS_CONNECT   = 14
32 ADD_TRACE   = 15
33 ADD_ADDRESS = 16
34 ADD_ROUTE   = 17
35 DO_SETUP    = 18
36 DO_CREATE   = 19
37 DO_CONNECT_INIT = 20
38 DO_CONFIGURE    = 21
39 DO_CROSS_CONNECT_INIT   = 22
40 GET = 23
41 SET = 24
42 ACTION  = 25
43 STATUS  = 26
44 GUIDS  = 27
45 GET_ROUTE = 28
46 GET_ADDRESS = 29
47 RECOVER = 30
48 DO_PRECONFIGURE     = 31
49 GET_ATTRIBUTE_LIST  = 32
50 DO_CONNECT_COMPL    = 33
51 DO_CROSS_CONNECT_COMPL  = 34
52 TESTBED_ID  = 35
53 TESTBED_VERSION  = 36
54 EXPERIMENT_SET = 37
55 EXPERIMENT_GET = 38
56
57 # PARAMETER TYPE
58 STRING  =  100
59 INTEGER = 101
60 BOOL    = 102
61 FLOAT   = 103
62
63 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
64 controller_messages = dict({
65     XML:    "%d" % XML,
66     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s|%s"),
67     FINISHED:   "%d|%s" % (FINISHED, "%d"),
68     START:  "%d" % START,
69     STOP:   "%d" % STOP,
70     RECOVER : "%d" % RECOVER,
71     SHUTDOWN:   "%d" % SHUTDOWN,
72     })
73
74 # TESTBED INSTANCE PROTOCOL MESSAGES
75 testbed_messages = dict({
76     TRACE:  "%d|%s" % (TRACE, "%d|%s|%s"),
77     START:  "%d" % START,
78     STOP:   "%d" % STOP,
79     SHUTDOWN:   "%d" % SHUTDOWN,
80     CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
81     CREATE: "%d|%s" % (CREATE, "%d|%s"),
82     CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
83     FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
84     CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
85     CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"),
86     ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
87     ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
88     ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
89     DO_SETUP:   "%d" % DO_SETUP,
90     DO_CREATE:  "%d" % DO_CREATE,
91     DO_CONNECT_INIT:    "%d" % DO_CONNECT_INIT,
92     DO_CONNECT_COMPL:   "%d" % DO_CONNECT_COMPL,
93     DO_CONFIGURE:       "%d" % DO_CONFIGURE,
94     DO_PRECONFIGURE:    "%d" % DO_PRECONFIGURE,
95     DO_CROSS_CONNECT_INIT:  "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
96     DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
97     GET:    "%d|%s" % (GET, "%d|%s|%s"),
98     SET:    "%d|%s" % (SET, "%d|%s|%s|%d|%s"),
99     EXPERIMENT_GET:    "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"),
100     EXPERIMENT_SET:    "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"),
101     GET_ROUTE: "%d|%s" % (GET_ROUTE, "%d|%d|%s"),
102     GET_ADDRESS: "%d|%s" % (GET_ADDRESS, "%d|%d|%s"),
103     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
104     STATUS: "%d|%s" % (STATUS, "%s"),
105     GUIDS:  "%d" % GUIDS,
106     GET_ATTRIBUTE_LIST:  "%d|%s" % (GET_ATTRIBUTE_LIST,"%d"),
107     TESTBED_ID:  "%d" % TESTBED_ID,
108     TESTBED_VERSION:  "%d" % TESTBED_VERSION,
109    })
110
111 instruction_text = dict({
112     OK:     "OK",
113     ERROR:  "ERROR",
114     XML:    "XML",
115     TRACE:  "TRACE",
116     FINISHED:   "FINISHED",
117     START:  "START",
118     STOP:   "STOP",
119     RECOVER: "RECOVER",
120     SHUTDOWN:   "SHUTDOWN",
121     CONFIGURE:  "CONFIGURE",
122     CREATE: "CREATE",
123     CREATE_SET: "CREATE_SET",
124     FACTORY_SET:    "FACTORY_SET",
125     CONNECT:    "CONNECT",
126     CROSS_CONNECT: "CROSS_CONNECT",
127     ADD_TRACE:  "ADD_TRACE",
128     ADD_ADDRESS:    "ADD_ADDRESS",
129     ADD_ROUTE:  "ADD_ROUTE",
130     DO_SETUP:   "DO_SETUP",
131     DO_CREATE:  "DO_CREATE",
132     DO_CONNECT_INIT: "DO_CONNECT_INIT",
133     DO_CONNECT_COMPL: "DO_CONNECT_COMPL",
134     DO_CONFIGURE:   "DO_CONFIGURE",
135     DO_PRECONFIGURE:   "DO_PRECONFIGURE",
136     DO_CROSS_CONNECT_INIT:  "DO_CROSS_CONNECT_INIT",
137     DO_CROSS_CONNECT_COMPL: "DO_CROSS_CONNECT_COMPL",
138     GET:    "GET",
139     SET:    "SET",
140     GET_ROUTE: "GET_ROUTE",
141     GET_ADDRESS: "GET_ADDRESS",
142     GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
143     ACTION: "ACTION",
144     STATUS: "STATUS",
145     GUIDS:  "GUIDS",
146     STRING: "STRING",
147     INTEGER:    "INTEGER",
148     BOOL:   "BOOL",
149     FLOAT:  "FLOAT",
150     TESTBED_ID: "TESTBED_ID",
151     TESTBED_VERSION: "TESTBED_VERSION",
152     EXPERIMENT_SET: "EXPERIMENT_SET",
153     EXPERIMENT_GET: "EXPERIMENT_GET",
154     })
155
156 def get_type(value):
157     if isinstance(value, bool):
158         return BOOL
159     elif isinstance(value, int):
160         return INTEGER
161     elif isinstance(value, float):
162         return FLOAT
163     else:
164         return STRING
165
166 def set_type(type, value):
167     if type == INTEGER:
168         value = int(value)
169     elif type == FLOAT:
170         value = float(value)
171     elif type == BOOL:
172         value = value == "True"
173     else:
174         value = str(value)
175     return value
176
177 def log_msg(server, params):
178     instr = int(params[0])
179     instr_txt = instruction_text[instr]
180     server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
181         instr_txt, ", ".join(map(str, params[1:]))))
182
183 def log_reply(server, reply):
184     res = reply.split("|")
185     code = int(res[0])
186     code_txt = instruction_text[code]
187     txt = base64.b64decode(res[1])
188     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
189             code_txt, txt))
190
191 def to_server_log_level(log_level):
192     return (
193         server.DEBUG_LEVEL
194             if log_level == DC.DEBUG_LEVEL 
195         else server.ERROR_LEVEL
196     )
197
198 def get_access_config_params(access_config):
199     root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
200     log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
201     log_level = to_server_log_level(log_level)
202     user = host = port = agent = key = None
203     communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
204     environment_setup = (
205         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
206         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
207         else None
208     )
209     if communication == DC.ACCESS_SSH:
210         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
211         host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
212         port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
213         agent = access_config.get_attribute_value(DC.USE_AGENT)
214         key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
215     return (root_dir, log_level, user, host, port, key, agent, environment_setup)
216
217 class AccessConfiguration(AttributesMap):
218     def __init__(self, params = None):
219         super(AccessConfiguration, self).__init__()
220         
221         from nepi.core.metadata import Metadata
222         
223         for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES:
224             self.add_attribute(**attr_info)
225         
226         if params:
227             for attr_name, attr_value in params.iteritems():
228                 parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
229                 attr_value = parser(attr_value)
230                 self.set_attribute_value(attr_name, attr_value)
231
232 class TempDir(object):
233     def __init__(self):
234         self.path = tempfile.mkdtemp()
235     
236     def __del__(self):
237         shutil.rmtree(self.path)
238
239 class PermDir(object):
240     def __init__(self, path):
241         self.path = path
242
243 def create_controller(xml, access_config = None):
244     mode = None if not access_config \
245             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
246     launch = True if not access_config \
247             else not access_config.get_attribute_value(DC.RECOVER)
248     if not mode or mode == DC.MODE_SINGLE_PROCESS:
249         if not launch:
250             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
251         
252         from nepi.core.execute import ExperimentController
253         
254         if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
255             root_dir = TempDir()
256         else:
257             root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
258         controller = ExperimentController(xml, root_dir.path)
259         
260         # inject reference to temporary dir, so that it gets cleaned
261         # up at destruction time.
262         controller._tempdir = root_dir
263         
264         return controller
265     elif mode == DC.MODE_DAEMON:
266         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
267                 get_access_config_params(access_config)
268         return ExperimentControllerProxy(root_dir, log_level,
269                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
270                 agent = agent, launch = launch,
271                 environment_setup = environment_setup)
272     raise RuntimeError("Unsupported access configuration '%s'" % mode)
273
274 def create_testbed_controller(testbed_id, testbed_version, access_config):
275     mode = None if not access_config \
276             else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
277     launch = True if not access_config \
278             else not access_config.get_attribute_value(DC.RECOVER)
279     if not mode or mode == DC.MODE_SINGLE_PROCESS:
280         if not launch:
281             raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
282         return  _build_testbed_controller(testbed_id, testbed_version)
283     elif mode == DC.MODE_DAEMON:
284         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
285                 get_access_config_params(access_config)
286         return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, 
287                 testbed_version = testbed_version, host = host, port = port, ident_key = key,
288                 user = user, agent = agent, launch = launch,
289                 environment_setup = environment_setup)
290     raise RuntimeError("Unsupported access configuration '%s'" % mode)
291
292 def _build_testbed_controller(testbed_id, testbed_version):
293     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
294     if not mod_name in sys.modules:
295         __import__(mod_name)
296     module = sys.modules[mod_name]
297     return module.TestbedController(testbed_version)
298
299 class TestbedControllerServer(server.Server):
300     def __init__(self, root_dir, log_level, testbed_id, testbed_version):
301         super(TestbedControllerServer, self).__init__(root_dir, log_level)
302         self._testbed_id = testbed_id
303         self._testbed_version = testbed_version
304         self._testbed = None
305
306     def post_daemonize(self):
307         self._testbed = _build_testbed_controller(self._testbed_id, 
308                 self._testbed_version)
309
310     def reply_action(self, msg):
311         if not msg:
312             result = base64.b64encode("Invalid command line")
313             reply = "%d|%s" % (ERROR, result)
314         else:
315             params = msg.split("|")
316             instruction = int(params[0])
317             log_msg(self, params)
318             try:
319                 if instruction == TRACE:
320                     reply = self.trace(params)
321                 elif instruction == START:
322                     reply = self.start(params)
323                 elif instruction == STOP:
324                     reply = self.stop(params)
325                 elif instruction == SHUTDOWN:
326                     reply = self.shutdown(params)
327                 elif instruction == CONFIGURE:
328                     reply = self.defer_configure(params)
329                 elif instruction == CREATE:
330                     reply = self.defer_create(params)
331                 elif instruction == CREATE_SET:
332                     reply = self.defer_create_set(params)
333                 elif instruction == FACTORY_SET:
334                     reply = self.defer_factory_set(params)
335                 elif instruction == CONNECT:
336                     reply = self.defer_connect(params)
337                 elif instruction == CROSS_CONNECT:
338                     reply = self.defer_cross_connect(params)
339                 elif instruction == ADD_TRACE:
340                     reply = self.defer_add_trace(params)
341                 elif instruction == ADD_ADDRESS:
342                     reply = self.defer_add_address(params)
343                 elif instruction == ADD_ROUTE:
344                     reply = self.defer_add_route(params)
345                 elif instruction == DO_SETUP:
346                     reply = self.do_setup(params)
347                 elif instruction == DO_CREATE:
348                     reply = self.do_create(params)
349                 elif instruction == DO_CONNECT_INIT:
350                     reply = self.do_connect_init(params)
351                 elif instruction == DO_CONNECT_COMPL:
352                     reply = self.do_connect_compl(params)
353                 elif instruction == DO_CONFIGURE:
354                     reply = self.do_configure(params)
355                 elif instruction == DO_PRECONFIGURE:
356                     reply = self.do_preconfigure(params)
357                 elif instruction == DO_CROSS_CONNECT_INIT:
358                     reply = self.do_cross_connect_init(params)
359                 elif instruction == DO_CROSS_CONNECT_COMPL:
360                     reply = self.do_cross_connect_compl(params)
361                 elif instruction == GET:
362                     reply = self.get(params)
363                 elif instruction == SET:
364                     reply = self.set(params)
365                 elif instruction == GET_ADDRESS:
366                     reply = self.get_address(params)
367                 elif instruction == GET_ROUTE:
368                     reply = self.get_route(params)
369                 elif instruction == ACTION:
370                     reply = self.action(params)
371                 elif instruction == STATUS:
372                     reply = self.status(params)
373                 elif instruction == GUIDS:
374                     reply = self.guids(params)
375                 elif instruction == GET_ATTRIBUTE_LIST:
376                     reply = self.get_attribute_list(params)
377                 elif instruction == TESTBED_ID:
378                     reply = self.testbed_id(params)
379                 elif instruction == TESTBED_VERSION:
380                     reply = self.testbed_version(params)
381                 else:
382                     error = "Invalid instruction %s" % instruction
383                     self.log_error(error)
384                     result = base64.b64encode(error)
385                     reply = "%d|%s" % (ERROR, result)
386             except:
387                 error = self.log_error()
388                 result = base64.b64encode(error)
389                 reply = "%d|%s" % (ERROR, result)
390         log_reply(self, reply)
391         return reply
392
393     def guids(self, params):
394         guids = self._testbed.guids
395         value = cPickle.dumps(guids)
396         result = base64.b64encode(value)
397         return "%d|%s" % (OK, result)
398
399     def testbed_id(self, params):
400         testbed_id = self._testbed.testbed_id
401         result = base64.b64encode(str(testbed_id))
402         return "%d|%s" % (OK, result)
403
404     def testbed_version(self, params):
405         testbed_version = self._testbed.testbed_version
406         result = base64.b64encode(str(testbed_version))
407         return "%d|%s" % (OK, result)
408
409     def defer_create(self, params):
410         guid = int(params[1])
411         factory_id = params[2]
412         self._testbed.defer_create(guid, factory_id)
413         return "%d|%s" % (OK, "")
414
415     def trace(self, params):
416         guid = int(params[1])
417         trace_id = params[2]
418         attribute = base64.b64decode(params[3])
419         trace = self._testbed.trace(guid, trace_id, attribute)
420         result = base64.b64encode(trace)
421         return "%d|%s" % (OK, result)
422
423     def start(self, params):
424         self._testbed.start()
425         return "%d|%s" % (OK, "")
426
427     def stop(self, params):
428         self._testbed.stop()
429         return "%d|%s" % (OK, "")
430
431     def shutdown(self, params):
432         self._testbed.shutdown()
433         return "%d|%s" % (OK, "")
434
435     def defer_configure(self, params):
436         name = base64.b64decode(params[1])
437         value = base64.b64decode(params[2])
438         type = int(params[3])
439         value = set_type(type, value)
440         self._testbed.defer_configure(name, value)
441         return "%d|%s" % (OK, "")
442
443     def defer_create_set(self, params):
444         guid = int(params[1])
445         name = base64.b64decode(params[2])
446         value = base64.b64decode(params[3])
447         type = int(params[4])
448         value = set_type(type, value)
449         self._testbed.defer_create_set(guid, name, value)
450         return "%d|%s" % (OK, "")
451
452     def defer_factory_set(self, params):
453         name = base64.b64decode(params[1])
454         value = base64.b64decode(params[2])
455         type = int(params[3])
456         value = set_type(type, value)
457         self._testbed.defer_factory_set(name, value)
458         return "%d|%s" % (OK, "")
459
460     def defer_connect(self, params):
461         guid1 = int(params[1])
462         connector_type_name1 = params[2]
463         guid2 = int(params[3])
464         connector_type_name2 = params[4]
465         self._testbed.defer_connect(guid1, connector_type_name1, guid2, 
466             connector_type_name2)
467         return "%d|%s" % (OK, "")
468
469     def defer_cross_connect(self, params):
470         guid = int(params[1])
471         connector_type_name = params[2]
472         cross_guid = int(params[3])
473         cross_testbed_guid = int(params[4])
474         cross_testbed_id = params[5]
475         cross_factory_id = params[6]
476         cross_connector_type_name = params[7]
477         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
478             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
479             cross_connector_type_name)
480         return "%d|%s" % (OK, "")
481
482     def defer_add_trace(self, params):
483         guid = int(params[1])
484         trace_id = params[2]
485         self._testbed.defer_add_trace(guid, trace_id)
486         return "%d|%s" % (OK, "")
487
488     def defer_add_address(self, params):
489         guid = int(params[1])
490         address = params[2]
491         netprefix = int(params[3])
492         broadcast = params[4]
493         self._testbed.defer_add_address(guid, address, netprefix,
494                 broadcast)
495         return "%d|%s" % (OK, "")
496
497     def defer_add_route(self, params):
498         guid = int(params[1])
499         destination = params[2]
500         netprefix = int(params[3])
501         nexthop = params[4]
502         self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
503         return "%d|%s" % (OK, "")
504
505     def do_setup(self, params):
506         self._testbed.do_setup()
507         return "%d|%s" % (OK, "")
508
509     def do_create(self, params):
510         self._testbed.do_create()
511         return "%d|%s" % (OK, "")
512
513     def do_connect_init(self, params):
514         self._testbed.do_connect_init()
515         return "%d|%s" % (OK, "")
516
517     def do_connect_compl(self, params):
518         self._testbed.do_connect_compl()
519         return "%d|%s" % (OK, "")
520
521     def do_configure(self, params):
522         self._testbed.do_configure()
523         return "%d|%s" % (OK, "")
524
525     def do_preconfigure(self, params):
526         self._testbed.do_preconfigure()
527         return "%d|%s" % (OK, "")
528
529     def do_cross_connect_init(self, params):
530         pcross_data = base64.b64decode(params[1])
531         cross_data = cPickle.loads(pcross_data)
532         self._testbed.do_cross_connect_init(cross_data)
533         return "%d|%s" % (OK, "")
534
535     def do_cross_connect_compl(self, params):
536         pcross_data = base64.b64decode(params[1])
537         cross_data = cPickle.loads(pcross_data)
538         self._testbed.do_cross_connect_compl(cross_data)
539         return "%d|%s" % (OK, "")
540
541     def get(self, params):
542         guid = int(params[1])
543         name = base64.b64decode(params[2])
544         time = params[3]
545         value = self._testbed.get(guid, name, time)
546         result = base64.b64encode(str(value))
547         return "%d|%s" % (OK, result)
548
549     def set(self, params):
550         guid = int(params[1])
551         name = base64.b64decode(params[2])
552         value = base64.b64decode(params[3])
553         type = int(params[2])
554         time = params[4]
555         value = set_type(type, value)
556         self._testbed.set(guid, name, value, time)
557         return "%d|%s" % (OK, "")
558
559     def get_address(self, params):
560         guid = int(params[1])
561         index = int(params[2])
562         attribute = base64.b64decode(params[3])
563         value = self._testbed.get_address(guid, index, attribute)
564         result = base64.b64encode(str(value))
565         return "%d|%s" % (OK, result)
566
567     def get_route(self, params):
568         guid = int(params[1])
569         index = int(params[2])
570         attribute = base64.b64decode(params[3])
571         value = self._testbed.get_route(guid, index, attribute)
572         result = base64.b64encode(str(value))
573         return "%d|%s" % (OK, result)
574
575     def action(self, params):
576         time = params[1]
577         guid = int(params[2])
578         command = base64.b64decode(params[3])
579         self._testbed.action(time, guid, command)
580         return "%d|%s" % (OK, "")
581
582     def status(self, params):
583         guid = None
584         if params[1] != "None":
585             guid = int(params[1])
586         status = self._testbed.status(guid)
587         result = base64.b64encode(str(status))
588         return "%d|%s" % (OK, result)
589
590     def get_attribute_list(self, params):
591         guid = int(params[1])
592         attr_list = self._testbed.get_attribute_list(guid)
593         value = cPickle.dumps(attr_list)
594         result = base64.b64encode(value)
595         return "%d|%s" % (OK, result)
596
597 class ExperimentControllerServer(server.Server):
598     def __init__(self, root_dir, log_level, experiment_xml):
599         super(ExperimentControllerServer, self).__init__(root_dir, log_level)
600         self._experiment_xml = experiment_xml
601         self._controller = None
602
603     def post_daemonize(self):
604         from nepi.core.execute import ExperimentController
605         self._controller = ExperimentController(self._experiment_xml, 
606             root_dir = self._root_dir)
607
608     def reply_action(self, msg):
609         if not msg:
610             result = base64.b64encode("Invalid command line")
611             reply = "%d|%s" % (ERROR, result)
612         else:
613             params = msg.split("|")
614             instruction = int(params[0])
615             log_msg(self, params)
616             try:
617                 if instruction == XML:
618                     reply = self.experiment_xml(params)
619                 elif instruction == TRACE:
620                     reply = self.trace(params)
621                 elif instruction == FINISHED:
622                     reply = self.is_finished(params)
623                 elif instruction == EXPERIMENT_GET:
624                     reply = self.get(params)
625                 elif instruction == EXPERIMENT_SET:
626                     reply = self.set(params)
627                 elif instruction == START:
628                     reply = self.start(params)
629                 elif instruction == STOP:
630                     reply = self.stop(params)
631                 elif instruction == RECOVER:
632                     reply = self.recover(params)
633                 elif instruction == SHUTDOWN:
634                     reply = self.shutdown(params)
635                 else:
636                     error = "Invalid instruction %s" % instruction
637                     self.log_error(error)
638                     result = base64.b64encode(error)
639                     reply = "%d|%s" % (ERROR, result)
640             except:
641                 error = self.log_error()
642                 result = base64.b64encode(error)
643                 reply = "%d|%s" % (ERROR, result)
644         log_reply(self, reply)
645         return reply
646
647     def experiment_xml(self, params):
648         xml = self._controller.experiment_xml
649         result = base64.b64encode(xml)
650         return "%d|%s" % (OK, result)
651         
652     def trace(self, params):
653         testbed_guid = int(params[1])
654         guid = int(params[2])
655         trace_id = params[3]
656         attribute = base64.b64decode(params[4])
657         trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
658         result = base64.b64encode(trace)
659         return "%d|%s" % (OK, result)
660
661     def is_finished(self, params):
662         guid = int(params[1])
663         status = self._controller.is_finished(guid)
664         result = base64.b64encode(str(status))
665         return "%d|%s" % (OK, result)
666
667     def get(self, params):
668         testbed_guid = int(param[1])
669         guid = int(params[2])
670         name = base64.b64decode(params[3])
671         value = self._controller.get(testbed_guid, guid, name, time)
672         time = params[4]
673         result = base64.b64encode(str(value))
674         return "%d|%s" % (OK, result)
675
676     def set(self, params):
677         testbed_guid = int(params[1])
678         guid = int(params[2])
679         name = base64.b64decode(params[3])
680         value = base64.b64decode(params[4])
681         type = int(params[3])
682         time = params[5]
683         value = set_type(type, value)
684         self._controller.set(testbed_guid, guid, name, value, time)
685         return "%d|%s" % (OK, "")
686
687     def start(self, params):
688         self._controller.start()
689         return "%d|%s" % (OK, "")
690
691     def stop(self, params):
692         self._controller.stop()
693         return "%d|%s" % (OK, "")
694
695     def recover(self, params):
696         self._controller.recover()
697         return "%d|%s" % (OK, "")
698
699     def shutdown(self, params):
700         self._controller.shutdown()
701         return "%d|%s" % (OK, "")
702
703 class TestbedControllerProxy(object):
704     def __init__(self, root_dir, log_level, testbed_id = None, 
705             testbed_version = None, launch = True, host = None, 
706             port = None, user = None, ident_key = None, agent = None,
707             environment_setup = ""):
708         if launch:
709             if testbed_id == None or testbed_version == None:
710                 raise RuntimeError("To launch a TesbedInstance server a \
711                         testbed_id and testbed_version are required")
712             # ssh
713             if host != None:
714                 python_code = "from nepi.util.proxy import "\
715                         "TestbedControllerServer;"\
716                         "s = TestbedControllerServer('%s', %d, '%s', '%s');"\
717                         "s.run()" % (root_dir, log_level, testbed_id, 
718                                 testbed_version)
719                 proc = server.popen_ssh_subprocess(python_code, host = host,
720                     port = port, user = user, agent = agent,
721                     ident_key = ident_key,
722                     environment_setup = environment_setup,
723                     waitcommand = True)
724                 if proc.poll():
725                     err = proc.stderr.read()
726                     raise RuntimeError("Server could not be executed: %s" % \
727                             err)
728             else:
729                 # launch daemon
730                 s = TestbedControllerServer(root_dir, log_level, testbed_id, 
731                     testbed_version)
732                 s.run()
733
734         # connect client to server
735         self._client = server.Client(root_dir, host = host, port = port, 
736                 user = user, agent = agent, 
737                 environment_setup = environment_setup)
738
739     @property
740     def guids(self):
741         msg = testbed_messages[GUIDS]
742         self._client.send_msg(msg)
743         reply = self._client.read_reply()
744         if not reply:
745             raise RuntimeError, "Invalid reply: %r" % (reply,)
746         result = reply.split("|")
747         code = int(result[0])
748         text = base64.b64decode(result[1])
749         if code == ERROR:
750             raise RuntimeError(text)
751         guids = cPickle.loads(text)
752         return guids
753
754     @property
755     def testbed_id(self):
756         msg = testbed_messages[TESTBED_ID]
757         self._client.send_msg(msg)
758         reply = self._client.read_reply()
759         if not reply:
760             raise RuntimeError, "Invalid reply: %r" % (reply,)
761         result = reply.split("|")
762         code = int(result[0])
763         text = base64.b64decode(result[1])
764         if code == ERROR:
765             raise RuntimeError(text)
766         return str(text)
767
768     @property
769     def testbed_version(self):
770         msg = testbed_messages[TESTBED_VERSION]
771         self._client.send_msg(msg)
772         reply = self._client.read_reply()
773         if not reply:
774             raise RuntimeError, "Invalid reply: %r" % (reply,)
775         result = reply.split("|")
776         code = int(result[0])
777         text = base64.b64decode(result[1])
778         if code == ERROR:
779             raise RuntimeError(text)
780         return str(text)
781
782     def defer_configure(self, name, value):
783         msg = testbed_messages[CONFIGURE]
784         type = get_type(value)
785         # avoid having "|" in this parameters
786         name = base64.b64encode(name)
787         value = base64.b64encode(str(value))
788         msg = msg % (name, value, type)
789         self._client.send_msg(msg)
790         reply = self._client.read_reply()
791         if not reply:
792             raise RuntimeError, "Invalid reply: %r" % (reply,)
793         result = reply.split("|")
794         code = int(result[0])
795         text = base64.b64decode(result[1])
796         if code == ERROR:
797             raise RuntimeError(text)
798
799     def defer_create(self, guid, factory_id):
800         msg = testbed_messages[CREATE]
801         msg = msg % (guid, factory_id)
802         self._client.send_msg(msg)
803         reply = self._client.read_reply()
804         if not reply:
805             raise RuntimeError, "Invalid reply: %r" % (reply,)
806         result = reply.split("|")
807         code = int(result[0])
808         text = base64.b64decode(result[1])
809         if code == ERROR:
810             raise RuntimeError(text)
811
812     def defer_create_set(self, guid, name, value):
813         msg = testbed_messages[CREATE_SET]
814         type = get_type(value)
815         # avoid having "|" in this parameters
816         name = base64.b64encode(name)
817         value = base64.b64encode(str(value))
818         msg = msg % (guid, name, value, type)
819         self._client.send_msg(msg)
820         reply = self._client.read_reply()
821         if not reply:
822             raise RuntimeError, "Invalid reply: %r" % (reply,)
823         result = reply.split("|")
824         code = int(result[0])
825         text = base64.b64decode(result[1])
826         if code == ERROR:
827             raise RuntimeError(text)
828
829     def defer_factory_set(self, guid, name, value):
830         msg = testbed_messages[FACTORY_SET]
831         type = get_type(value)
832         # avoid having "|" in this parameters
833         name = base64.b64encode(name)
834         value = base64.b64encode(str(value))
835         msg = msg % (guid, name, value, type)
836         self._client.send_msg(msg)
837         reply = self._client.read_reply()
838         if not reply:
839             raise RuntimeError, "Invalid reply: %r" % (reply,)
840         result = reply.split("|")
841         code = int(result[0])
842         text = base64.b64decode(result[1])
843         if code == ERROR:
844             raise RuntimeError(text)
845
846     def defer_connect(self, guid1, connector_type_name1, guid2, 
847             connector_type_name2): 
848         msg = testbed_messages[CONNECT]
849         msg = msg % (guid1, connector_type_name1, guid2, 
850             connector_type_name2)
851         self._client.send_msg(msg)
852         reply = self._client.read_reply()
853         if not reply:
854             raise RuntimeError, "Invalid reply: %r" % (reply,)
855         result = reply.split("|")
856         code = int(result[0])
857         text = base64.b64decode(result[1])
858         if code == ERROR:
859             raise RuntimeError(text)
860
861     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
862             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
863             cross_connector_type_name):
864         msg = testbed_messages[CROSS_CONNECT]
865         msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
866             cross_testbed_id, cross_factory_id, cross_connector_type_name)
867         self._client.send_msg(msg)
868         reply = self._client.read_reply()
869         if not reply:
870             raise RuntimeError, "Invalid reply: %r" % (reply,)
871         result = reply.split("|")
872         code = int(result[0])
873         text = base64.b64decode(result[1])
874         if code == ERROR:
875             raise RuntimeError(text)
876
877     def defer_add_trace(self, guid, trace_id):
878         msg = testbed_messages[ADD_TRACE]
879         msg = msg % (guid, trace_id)
880         self._client.send_msg(msg)
881         reply = self._client.read_reply()
882         if not reply:
883             raise RuntimeError, "Invalid reply: %r" % (reply,)
884         result = reply.split("|")
885         code = int(result[0])
886         text = base64.b64decode(result[1])
887         if code == ERROR:
888             raise RuntimeError(text)
889
890     def defer_add_address(self, guid, address, netprefix, broadcast): 
891         msg = testbed_messages[ADD_ADDRESS]
892         msg = msg % (guid, address, netprefix, broadcast)
893         self._client.send_msg(msg)
894         reply = self._client.read_reply()
895         if not reply:
896             raise RuntimeError, "Invalid reply: %r" % (reply,)
897         result = reply.split("|")
898         code = int(result[0])
899         text = base64.b64decode(result[1])
900         if code == ERROR:
901             raise RuntimeError(text)
902
903     def defer_add_route(self, guid, destination, netprefix, nexthop):
904         msg = testbed_messages[ADD_ROUTE]
905         msg = msg % (guid, destination, netprefix, nexthop)
906         self._client.send_msg(msg)
907         reply = self._client.read_reply()
908         if not reply:
909             raise RuntimeError, "Invalid reply: %r" % (reply,)
910         result = reply.split("|")
911         code = int(result[0])
912         text = base64.b64decode(result[1])
913         if code == ERROR:
914             raise RuntimeError(text)
915
916     def do_setup(self):
917         msg = testbed_messages[DO_SETUP]
918         self._client.send_msg(msg)
919         reply = self._client.read_reply()
920         if not reply:
921             raise RuntimeError, "Invalid reply: %r" % (reply,)
922         result = reply.split("|")
923         code = int(result[0])
924         text = base64.b64decode(result[1])
925         if code == ERROR:
926             raise RuntimeError(text)
927
928     def do_create(self):
929         msg = testbed_messages[DO_CREATE]
930         self._client.send_msg(msg)
931         reply = self._client.read_reply()
932         if not reply:
933             raise RuntimeError, "Invalid reply: %r" % (reply,)
934         result = reply.split("|")
935         code = int(result[0])
936         text = base64.b64decode(result[1])
937         if code == ERROR:
938             raise RuntimeError(text)
939
940     def do_connect_init(self):
941         msg = testbed_messages[DO_CONNECT_INIT]
942         self._client.send_msg(msg)
943         reply = self._client.read_reply()
944         if not reply:
945             raise RuntimeError, "Invalid reply: %r" % (reply,)
946         result = reply.split("|")
947         code = int(result[0])
948         text = base64.b64decode(result[1])
949         if code == ERROR:
950             raise RuntimeError(text)
951
952     def do_connect_compl(self):
953         msg = testbed_messages[DO_CONNECT_COMPL]
954         self._client.send_msg(msg)
955         reply = self._client.read_reply()
956         if not reply:
957             raise RuntimeError, "Invalid reply: %r" % (reply,)
958         result = reply.split("|")
959         code = int(result[0])
960         text = base64.b64decode(result[1])
961         if code == ERROR:
962             raise RuntimeError(text)
963
964     def do_configure(self):
965         msg = testbed_messages[DO_CONFIGURE]
966         self._client.send_msg(msg)
967         reply = self._client.read_reply()
968         if not reply:
969             raise RuntimeError, "Invalid reply: %r" % (reply,)
970         result = reply.split("|")
971         code = int(result[0])
972         text = base64.b64decode(result[1])
973         if code == ERROR:
974             raise RuntimeError(text)
975
976     def do_preconfigure(self):
977         msg = testbed_messages[DO_PRECONFIGURE]
978         self._client.send_msg(msg)
979         reply = self._client.read_reply()
980         if not reply:
981             raise RuntimeError, "Invalid reply: %r" % (reply,)
982         result = reply.split("|")
983         code = int(result[0])
984         text = base64.b64decode(result[1])
985         if code == ERROR:
986             raise RuntimeError(text)
987
988     def do_cross_connect_init(self, cross_data):
989         msg = testbed_messages[DO_CROSS_CONNECT_INIT]
990         pcross_data = cPickle.dumps(cross_data)
991         cross_data = base64.b64encode(pcross_data)
992         msg = msg % (cross_data)
993         self._client.send_msg(msg)
994         reply = self._client.read_reply()
995         if not reply:
996             raise RuntimeError, "Invalid reply: %r" % (reply,)
997         result = reply.split("|")
998         code = int(result[0])
999         text = base64.b64decode(result[1])
1000         if code == ERROR:
1001             raise RuntimeError(text)
1002
1003     def do_cross_connect_compl(self, cross_data):
1004         msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
1005         pcross_data = cPickle.dumps(cross_data)
1006         cross_data = base64.b64encode(pcross_data)
1007         msg = msg % (cross_data)
1008         self._client.send_msg(msg)
1009         reply = self._client.read_reply()
1010         if not reply:
1011             raise RuntimeError, "Invalid reply: %r" % (reply,)
1012         result = reply.split("|")
1013         code = int(result[0])
1014         text = base64.b64decode(result[1])
1015         if code == ERROR:
1016             raise RuntimeError(text)
1017
1018     def start(self, time = TIME_NOW):
1019         msg = testbed_messages[START]
1020         self._client.send_msg(msg)
1021         reply = self._client.read_reply()
1022         if not reply:
1023             raise RuntimeError, "Invalid reply: %r" % (reply,)
1024         result = reply.split("|")
1025         code = int(result[0])
1026         text = base64.b64decode(result[1])
1027         if code == ERROR:
1028             raise RuntimeError(text)
1029
1030     def stop(self, time = TIME_NOW):
1031         msg = testbed_messages[STOP]
1032         self._client.send_msg(msg)
1033         reply = self._client.read_reply()
1034         if not reply:
1035             raise RuntimeError, "Invalid reply: %r" % (reply,)
1036         result = reply.split("|")
1037         code = int(result[0])
1038         text = base64.b64decode(result[1])
1039         if code == ERROR:
1040             raise RuntimeError(text)
1041
1042     def set(self, guid, name, value, time = TIME_NOW):
1043         msg = testbed_messages[SET]
1044         type = get_type(value)
1045         # avoid having "|" in this parameters
1046         name = base64.b64encode(name)
1047         value = base64.b64encode(str(value))
1048         msg = msg % (guid, name, value, type, time)
1049         self._client.send_msg(msg)
1050         reply = self._client.read_reply()
1051         if not reply:
1052             raise RuntimeError, "Invalid reply: %r" % (reply,)
1053         result = reply.split("|")
1054         code = int(result[0])
1055         text = base64.b64decode(result[1])
1056         if code == ERROR:
1057             raise RuntimeError(text)
1058
1059     def get(self, guid, name, time = TIME_NOW):
1060         msg = testbed_messages[GET]
1061         # avoid having "|" in this parameters
1062         name = base64.b64encode(name)
1063         msg = msg % (guid, name, time)
1064         self._client.send_msg(msg)
1065         reply = self._client.read_reply()
1066         if not reply:
1067             raise RuntimeError, "Invalid reply: %r" % (reply,)
1068         result = reply.split("|")
1069         code = int(result[0])
1070         text = base64.b64decode(result[1])
1071         if code == ERROR:
1072             raise RuntimeError(text)
1073         return text
1074
1075     def get_address(self, guid, index, attribute):
1076         msg = testbed_messages[GET_ADDRESS]
1077         # avoid having "|" in this parameters
1078         attribute = base64.b64encode(attribute)
1079         msg = msg % (guid, index, attribute)
1080         self._client.send_msg(msg)
1081         reply = self._client.read_reply()
1082         if not reply:
1083             raise RuntimeError, "Invalid reply: %r" % (reply,)
1084         result = reply.split("|")
1085         code = int(result[0])
1086         text = base64.b64decode(result[1])
1087         if code == ERROR:
1088             raise RuntimeError(text)
1089         return text
1090
1091     def get_route(self, guid, index, attribute):
1092         msg = testbed_messages[GET_ROUTE]
1093         # avoid having "|" in this parameters
1094         attribute = base64.b64encode(attribute)
1095         msg = msg % (guid, index, attribute)
1096         self._client.send_msg(msg)
1097         reply = self._client.read_reply()
1098         if not reply:
1099             raise RuntimeError, "Invalid reply: %r" % (reply,)
1100         result = reply.split("|")
1101         code = int(result[0])
1102         text = base64.b64decode(result[1])
1103         if code == ERROR:
1104             raise RuntimeError(text)
1105         return text
1106
1107     def action(self, time, guid, action):
1108         msg = testbed_messages[ACTION]
1109         msg = msg % (time, guid, action)
1110         self._client.send_msg(msg)
1111         reply = self._client.read_reply()
1112         if not reply:
1113             raise RuntimeError, "Invalid reply: %r" % (reply,)
1114         result = reply.split("|")
1115         code = int(result[0])
1116         text = base64.b64decode(result[1])
1117         if code == ERROR:
1118             raise RuntimeError(text)
1119
1120     def status(self, guid = None):
1121         msg = testbed_messages[STATUS]
1122         msg = msg % (guid,)
1123         self._client.send_msg(msg)
1124         reply = self._client.read_reply()
1125         if not reply:
1126             raise RuntimeError, "Invalid reply: %r" % (reply,)
1127         result = reply.split("|")
1128         code = int(result[0])
1129         text = base64.b64decode(result[1])
1130         if code == ERROR:
1131             raise RuntimeError(text)
1132         return int(text)
1133
1134     def trace(self, guid, trace_id, attribute='value'):
1135         msg = testbed_messages[TRACE]
1136         attribute = base64.b64encode(attribute)
1137         msg = msg % (guid, trace_id, attribute)
1138         self._client.send_msg(msg)
1139         reply = self._client.read_reply()
1140         if not reply:
1141             raise RuntimeError, "Invalid reply: %r" % (reply,)
1142         result = reply.split("|")
1143         code = int(result[0])
1144         text = base64.b64decode(result[1])
1145         if code == ERROR:
1146             raise RuntimeError(text)
1147         return text
1148
1149     def get_attribute_list(self, guid):
1150         msg = testbed_messages[GET_ATTRIBUTE_LIST]
1151         msg = msg % (guid,)
1152         self._client.send_msg(msg)
1153         reply = self._client.read_reply()
1154         if not reply:
1155             raise RuntimeError, "Invalid reply: %r" % (reply,)
1156         result = reply.split("|")
1157         code = int(result[0])
1158         text = base64.b64decode(result[1])
1159         if code == ERROR:
1160             raise RuntimeError(text)
1161         attr_list = cPickle.loads(text)
1162         return attr_list
1163
1164     def shutdown(self):
1165         msg = testbed_messages[SHUTDOWN]
1166         self._client.send_msg(msg)
1167         reply = self._client.read_reply()
1168         if not reply:
1169             raise RuntimeError, "Invalid reply: %r" % (reply,)
1170         result = reply.split("|")
1171         code = int(result[0])
1172         text = base64.b64decode(result[1])
1173         if code == ERROR:
1174             raise RuntimeError(text)
1175         self._client.send_stop()
1176         self._client.read_reply() # wait for it
1177
1178 class ExperimentControllerProxy(object):
1179     def __init__(self, root_dir, log_level, experiment_xml = None, 
1180             launch = True, host = None, port = None, user = None, 
1181             ident_key = None, agent = None, environment_setup = ""):
1182         if launch:
1183             # launch server
1184             if experiment_xml == None:
1185                 raise RuntimeError("To launch a ExperimentControllerServer a \
1186                         xml description of the experiment is required")
1187             # ssh
1188             if host != None:
1189                 xml = experiment_xml
1190                 python_code = "from nepi.util.proxy import ExperimentControllerServer;\
1191                         s = ExperimentControllerServer(%r, %r, %r);\
1192                         s.run()" % (root_dir, log_level, xml)
1193                 proc = server.popen_ssh_subprocess(python_code, host = host,
1194                     port = port, user = user, agent = agent,
1195                     ident_key = ident_key,
1196                     environment_setup = environment_setup,
1197                     waitcommand = True)
1198                 if proc.poll():
1199                     err = proc.stderr.read()
1200                     raise RuntimeError("Server could not be executed: %s" % \
1201                             err)
1202             else:
1203                 # launch daemon
1204                 s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
1205                 s.run()
1206
1207         # connect client to server
1208         self._client = server.Client(root_dir, host = host, port = port, 
1209                 user = user, agent = agent,
1210                 environment_setup = environment_setup)
1211
1212     @property
1213     def experiment_xml(self):
1214         msg = controller_messages[XML]
1215         self._client.send_msg(msg)
1216         reply = self._client.read_reply()
1217         if not reply:
1218             raise RuntimeError, "Invalid reply: %r" % (reply,)
1219         result = reply.split("|")
1220         code = int(result[0])
1221         text = base64.b64decode(result[1])
1222         if code == ERROR:
1223             raise RuntimeError(text)
1224         return text
1225
1226     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
1227         msg = controller_messages[TRACE]
1228         attribute = base64.b64encode(attribute)
1229         msg = msg % (testbed_guid, guid, trace_id, attribute)
1230         self._client.send_msg(msg)
1231         reply = self._client.read_reply()
1232         if not reply:
1233             raise RuntimeError, "Invalid reply: %r" % (reply,)
1234         result = reply.split("|")
1235         code = int(result[0])
1236         text =  base64.b64decode(result[1])
1237         if code == OK:
1238             return text
1239         raise RuntimeError(text)
1240
1241     def start(self):
1242         msg = controller_messages[START]
1243         self._client.send_msg(msg)
1244         reply = self._client.read_reply()
1245         if not reply:
1246             raise RuntimeError, "Invalid reply: %r" % (reply,)
1247         result = reply.split("|")
1248         code = int(result[0])
1249         text =  base64.b64decode(result[1])
1250         if code == ERROR:
1251             raise RuntimeError(text)
1252
1253     def stop(self):
1254         msg = controller_messages[STOP]
1255         self._client.send_msg(msg)
1256         reply = self._client.read_reply()
1257         if not reply:
1258             raise RuntimeError, "Invalid reply: %r" % (reply,)
1259         result = reply.split("|")
1260         code = int(result[0])
1261         text =  base64.b64decode(result[1])
1262         if code == ERROR:
1263             raise RuntimeError(text)
1264
1265     def recover(self):
1266         msg = controller_messages[RECOVER]
1267         self._client.send_msg(msg)
1268         reply = self._client.read_reply()
1269         if not reply:
1270             raise RuntimeError, "Invalid reply: %r" % (reply,)
1271         result = reply.split("|")
1272         code = int(result[0])
1273         text =  base64.b64decode(result[1])
1274         if code == ERROR:
1275             raise RuntimeError(text)
1276
1277     def is_finished(self, guid):
1278         msg = controller_messages[FINISHED]
1279         msg = msg % guid
1280         self._client.send_msg(msg)
1281         reply = self._client.read_reply()
1282         if not reply:
1283             raise RuntimeError, "Invalid reply: %r" % (reply,)
1284         result = reply.split("|")
1285         code = int(result[0])
1286         text = base64.b64decode(result[1])
1287         if code == ERROR:
1288             raise RuntimeError(text)
1289         return text == "True"
1290
1291     def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
1292         msg = testbed_messages[EXPERIMENT_SET]
1293         type = get_type(value)
1294         # avoid having "|" in this parameters
1295         name = base64.b64encode(name)
1296         value = base64.b64encode(str(value))
1297         msg = msg % (testbed_guid, guid, name, value, type, time)
1298         self._client.send_msg(msg)
1299         reply = self._client.read_reply()
1300         if not reply:
1301             raise RuntimeError, "Invalid reply: %r" % (reply,)
1302         result = reply.split("|")
1303         code = int(result[0])
1304         text = base64.b64decode(result[1])
1305         if code == ERROR:
1306             raise RuntimeError(text)
1307
1308     def get(self, testbed_guid, guid, name, time = TIME_NOW):
1309         msg = testbed_messages[EXPERIMENT_GET]
1310         # avoid having "|" in this parameters
1311         name = base64.b64encode(name)
1312         msg = msg % (testbed_guid, guid, name, time)
1313         self._client.send_msg(msg)
1314         reply = self._client.read_reply()
1315         if not reply:
1316             raise RuntimeError, "Invalid reply: %r" % (reply,)
1317         result = reply.split("|")
1318         code = int(result[0])
1319         text = base64.b64decode(result[1])
1320         if code == ERROR:
1321             raise RuntimeError(text)
1322         return text
1323
1324     def shutdown(self):
1325         msg = controller_messages[SHUTDOWN]
1326         self._client.send_msg(msg)
1327         reply = self._client.read_reply()
1328         if not reply:
1329             raise RuntimeError, "Invalid reply: %r" % (reply,)
1330         result = reply.split("|")
1331         code = int(result[0])
1332         text =  base64.b64decode(result[1])
1333         if code == ERROR:
1334             raise RuntimeError(text)
1335         self._client.send_stop()
1336         self._client.read_reply() # wait for it
1337