-# Copyright (c) 2009, 2010, 2011 Nicira Networks
+# Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import ovs.ovsuuid
import ovs.poller
import ovs.util
-import idltest
+
def unbox_json(json):
if type(json) == list and len(json) == 1:
else:
return json
+
def do_default_atoms():
for type_ in types.ATOMIC_TYPES:
if type_ == types.VoidType:
sys.stdout.write("OK\n")
+
def do_default_data():
any_errors = False
for n_min in 0, 1:
if any_errors:
sys.exit(1)
+
def do_parse_atomic_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
atomic_type = types.AtomicType.from_json(type_json)
print ovs.json.to_string(atomic_type.to_json(), sort_keys=True)
+
def do_parse_base_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
base_type = types.BaseType.from_json(type_json)
print ovs.json.to_string(base_type.to_json(), sort_keys=True)
+
def do_parse_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
type_ = types.Type.from_json(type_json)
print ovs.json.to_string(type_.to_json(), sort_keys=True)
+
def do_parse_atoms(type_string, *atom_strings):
type_json = unbox_json(ovs.json.from_string(type_string))
base = types.BaseType.from_json(type_json)
except error.Error, e:
print unicode(e)
+
def do_parse_data(type_string, *data_strings):
type_json = unbox_json(ovs.json.from_string(type_string))
type_ = types.Type.from_json(type_json)
datum = data.Datum.from_json(type_, datum_json)
print ovs.json.to_string(datum.to_json())
+
def do_sort_atoms(type_string, atom_strings):
type_json = unbox_json(ovs.json.from_string(type_string))
base = types.BaseType.from_json(type_json)
print ovs.json.to_string([data.Atom.to_json(atom)
for atom in sorted(atoms)])
+
def do_parse_column(name, column_string):
column_json = unbox_json(ovs.json.from_string(column_string))
column = ovs.db.schema.ColumnSchema.from_json(column_json, name)
print ovs.json.to_string(column.to_json(), sort_keys=True)
+
def do_parse_table(name, table_string, default_is_root_string='false'):
default_is_root = default_is_root_string == 'true'
table_json = unbox_json(ovs.json.from_string(table_string))
table = ovs.db.schema.TableSchema.from_json(table_json, name)
print ovs.json.to_string(table.to_json(default_is_root), sort_keys=True)
+
def do_parse_schema(schema_string):
schema_json = unbox_json(ovs.json.from_string(schema_string))
schema = ovs.db.schema.DbSchema.from_json(schema_json)
print ovs.json.to_string(schema.to_json(), sort_keys=True)
+
def print_idl(idl, step):
simple = idl.tables["simple"].rows
l1 = idl.tables["link1"].rows
for row in l2.itervalues():
s = ["%03d: i=%s l1=" % (step, row.i)]
if row.l1:
- s.append(str(row.l1.i))
+ s.append(str(row.l1[0].i))
s.append(" uuid=%s" % row.uuid)
print(''.join(s))
n += 1
print("%03d: empty" % step)
sys.stdout.flush()
+
def substitute_uuids(json, symtab):
if type(json) in [str, unicode]:
symbol = symtab.get(json)
return d
return json
+
def parse_uuids(json, symtab):
if type(json) in [str, unicode] and ovs.ovsuuid.is_valid_string(json):
name = "#%d#" % len(symtab)
for value in json.itervalues():
parse_uuids(value, symtab)
+
def idltest_find_simple(idl, i):
for row in idl.tables["simple"].rows.itervalues():
if row.i == i:
return row
return None
+
def idl_set(idl, commands, step):
txn = ovs.db.idl.Transaction(idl)
increment = False
'"%s"\n' % args[1])
sys.exit(1)
elif name == "increment":
- if len(args) != 2:
- sys.stderr.write('"increment" command requires 2 arguments\n')
+ if len(args) != 1:
+ sys.stderr.write('"increment" command requires 1 argument\n')
+ sys.exit(1)
+
+ s = idltest_find_simple(idl, int(args[0]))
+ if not s:
+ sys.stderr.write('"set" command asks for nonexistent i=%d\n'
+ % int(args[0]))
sys.exit(1)
- txn.increment(args[0], args[1], [])
+ s.increment("i")
increment = True
elif name == "abort":
txn.abort()
sys.stdout.flush()
txn.abort()
return
+ elif name == "linktest":
+ l1_0 = txn.insert(idl.tables["link1"])
+ l1_0.i = 1
+ l1_0.k = [l1_0]
+ l1_0.ka = [l1_0]
+ l1_1 = txn.insert(idl.tables["link1"])
+ l1_1.i = 2
+ l1_1.k = [l1_0]
+ l1_1.ka = [l1_0, l1_1]
+ elif name == 'getattrtest':
+ l1 = txn.insert(idl.tables["link1"])
+ i = getattr(l1, 'i', 1)
+ assert i == 1
+ l1.i = 2
+ i = getattr(l1, 'i', 1)
+ assert i == 2
+ l1.k = [l1]
else:
sys.stderr.write("unknown command %s\n" % name)
sys.exit(1)
sys.stdout.write("\n")
sys.stdout.flush()
+
def do_idl(schema_file, remote, *commands):
- schema = ovs.db.schema.DbSchema.from_json(ovs.json.from_file(schema_file))
- idl = ovs.db.idl.Idl(remote, schema)
+ schema_helper = ovs.db.idl.SchemaHelper(schema_file)
+ schema_helper.register_all()
+ idl = ovs.db.idl.Idl(remote, schema_helper)
if commands:
error, stream = ovs.stream.Stream.open_block(
idl.wait(poller)
rpc.wait(poller)
poller.block()
-
+
print_idl(idl, step)
step += 1
sys.stderr.write("jsonrpc transaction failed: %s"
% os.strerror(error))
sys.exit(1)
+ elif reply.error is not None:
+ sys.stderr.write("jsonrpc transaction failed: %s"
+ % reply.error)
+ sys.exit(1)
+
sys.stdout.write("%03d: " % step)
sys.stdout.flush()
step += 1
idl.close()
print("%03d: done" % step)
+
def usage():
print """\
%(program_name)s: test utility for Open vSwitch database Python bindings
""" % {'program_name': ovs.util.PROGRAM_NAME}
sys.exit(0)
+
def main(argv):
try:
options, args = getopt.gnu_getopt(argv[1:], 't:h',
func(*args)
+
if __name__ == '__main__':
try:
main(sys.argv)