element = self._elements[guid]
if element:
setattr(element, name, value)
+
+ if hasattr(element, 'refresh'):
+ # invoke attribute refresh hook
+ element.refresh()
def get(self, time, guid, name):
# TODO: take on account schedule time for the task
for trace in self._traces.values():
trace.close()
for element in self._elements.values():
- pass
- #element.destroy()
+ # invoke cleanup hooks
+ if hasattr(element, 'cleanup'):
+ element.cleanup()
def trace(self, guid, trace_id, attribute='value'):
app = self._elements[guid]
return iface
+ def _make_netpipe(self, parameters):
+ iface = self._interfaces.NetPipe(self.plapi)
+
+ # Note: there is 1-to-1 correspondence between attribute names
+ # If that changes, this has to change as well
+ for attr,val in parameters.iteritems():
+ setattr(iface, attr, val)
+
+ return iface
+
def _make_internet(self, parameters):
return self._interfaces.Internet(self.plapi)
from constants import TESTBED_ID
import nepi.util.ipaddr2 as ipaddr2
+import nepi.util.server as server
import plcapi
class NodeIface(object):
api = plcapi.PLCAPI()
self._api = api
+class NetPipe(object):
+ def __init__(self, api=None):
+ if not api:
+ api = plcapi.PLCAPI()
+ self._api = api
+
+ # Attributes
+ self.mode = None
+ self.addrList = None
+ self.portList = None
+
+ self.plrIn = None
+ self.bwIn = None
+ self.delayIn = None
+ self.plrOut = None
+ self.bwOut = None
+ self.delayOut = None
+
+ # These get initialized when the pipe is connected to its node
+ self.node = None
+ self.configured = False
+
+ def validate(self):
+ if not self.mode:
+ raise RuntimeError, "Undefined NetPipe mode"
+ if not self.portList:
+ raise RuntimeError, "Undefined NetPipe port list - must always define the scope"
+ if not (self.plrIn or self.bwIn or self.delayIn):
+ raise RuntimeError, "Undefined NetPipe inbound characteristics"
+ if not (self.plrOut or self.bwOut or self.delayOut):
+ raise RuntimeError, "Undefined NetPipe outbound characteristics"
+ if not self.node:
+ raise RuntimeError, "Unconnected NetPipe"
+
+ def _add_pipedef(self, bw, plr, delay, options):
+ if delay:
+ options.extend(("delay","%dms" % (delay,)))
+ if bw:
+ options.extend(("bw","%.8fMbit/s" % (bw,)))
+ if plr:
+ options.extend(("plr","%.8f" % (plr,)))
+
+ def _get_ruledef(self):
+ scope = "%s%s%s" % (
+ self.portList,
+ "@" if self.addrList else "",
+ self.addrList or "",
+ )
+
+ options = []
+ if self.bwIn or self.plrIn or self.delayIn:
+ options.append("IN")
+ self._add_pipedef(self.bwIn, self.plrIn, self.delayIn, options)
+ if self.bwOut or self.plrOut or self.delayOut:
+ options.append("OUT")
+ self._add_pipedef(self.bwOut, self.plrOut, self.delayOut, options)
+ options = ' '.join(options)
+
+ return (scope,options)
+
+ def configure(self):
+ # set up rule
+ scope, options = self._get_ruledef()
+ command = "sudo -S netconfig config %s %s %s" % (self.mode, scope, options)
+ print command
+
+ (out,err),proc = server.popen_ssh_command(
+ command,
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+
+ # we have to clean up afterwards
+ self.configured = True
+
+ def refresh(self):
+ if self.configured:
+ # refresh rule
+ scope, options = self._get_ruledef()
+ command = "sudo -S netconfig refresh %s %s %s" % (self.mode, scope, options)
+
+ (out,err),proc = server.popen_ssh_command(
+ command,
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+
+ def cleanup(self):
+ if self.configured:
+ # remove rule
+ scope, options = self._get_ruledef()
+ command = "sudo -S netconfig delete %s %s" % (self.mode, scope)
+
+ (out,err),proc = server.popen_ssh_command(
+ command,
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+
+ self.configured = False
+
TUNIFACE = "TunInterface"
APPLICATION = "Application"
INTERNET = "Internet"
+NETPIPE = "NetPipe"
PL_TESTBED_ID = "planetlab"
+
+### Custom validation functions ###
+def is_addrlist(attribute, value):
+ if not validation.is_string(attribute, value):
+ return False
+
+ if not value:
+ # No empty strings
+ return False
+
+ components = value.split(',')
+
+ for component in components:
+ if '/' in component:
+ addr, mask = component.split('/',1)
+ else:
+ addr, mask = component, 32
+
+ if mask is not None and not (mask and mask.isdigit()):
+ # No empty or nonnumeric masks
+ return False
+
+ if not validation.is_ip4_address(attribute, value):
+ # Address part must be ipv4
+ return False
+
+ return True
+
+def is_portlist(attribute, value):
+ if not validation.is_string(attribute, value):
+ return False
+
+ if not value:
+ # No empty strings
+ return False
+
+ components = value.split(',')
+
+ for component in components:
+ if '-' in component:
+ pfrom, pto = component.split('-',1)
+ else:
+ pfrom = pto = component
+
+ if not pfrom or not pto or not pfrom.isdigit() or not pto.isdigit():
+ # No empty or nonnumeric ports
+ return False
+
+ return True
+
+
### Connection functions ####
def connect_node_iface_node(testbed_instance, node, iface):
def connect_tun_iface_node(testbed_instance, node, iface):
if not node.emulation:
- raise RuntimeError, "Usage of TUN interfaces requires emulation"
+ raise RuntimeError, "Use of TUN interfaces requires emulation"
iface.node = node
node.required_vsys.update(('fd_tuntap', 'vif_up'))
if app.depends:
node.required_packages.update(set(
app.depends.split() ))
+
+def connect_node_netpipe(testbed_instance, node, netpipe):
+ if not node.emulation:
+ raise RuntimeError, "Use of NetPipes requires emulation"
+ netpipe.node = node
### Creation functions ###
element = testbed_instance._make_internet(parameters)
testbed_instance.elements[guid] = element
+def create_netpipe(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_netpipe(parameters)
+ testbed_instance.elements[guid] = element
+
### Start/Stop functions ###
def start_application(testbed_instance, guid):
# Install stuff
app.setup()
+def configure_netpipe(testbed_instance, guid):
+ netpipe = testbed_instance._elements[guid]
+
+ # Do some validations
+ netpipe.validate()
+
+ # Wait for dependencies
+ netpipe.node.wait_dependencies()
+
+ # Install rules
+ netpipe.configure()
+
### Factory information ###
connector_types = dict({
"max": 1,
"min": 1
}),
+ "pipes": dict({
+ "help": "Connector to a NetPipe",
+ "name": "pipes",
+ "max": 2,
+ "min": 0
+ }),
})
connections = [
"to": (TESTBED_ID, APPLICATION, "node"),
"code": connect_app,
"can_cross": False
- })
+ }),
+ dict({
+ "from": (TESTBED_ID, NODE, "pipes"),
+ "to": (TESTBED_ID, NETPIPE, "node"),
+ "code": connect_node_netpipe,
+ "can_cross": False
+ }),
]
attributes = dict({
"flags": Attribute.DesignOnly,
"validation_function": validation.is_string
}),
+
+ "netpipe_mode": dict({
+ "name": "mode",
+ "help": "Link mode:\n"
+ " * SERVER: applies to incoming connections\n"
+ " * CLIENT: applies to outgoing connections\n"
+ " * SERVICE: applies to both",
+ "type": Attribute.ENUM,
+ "flags": Attribute.DesignOnly,
+ "allowed": ["SERVER",
+ "CLIENT",
+ "SERVICE"],
+ "validation_function": validation.is_enum,
+ }),
+ "port_list": dict({
+ "name": "portList",
+ "help": "Port list or range. Eg: '22', '22,23,27', '20-2000'",
+ "type": Attribute.STRING,
+ "validation_function": is_portlist,
+ }),
+ "addr_list": dict({
+ "name": "addrList",
+ "help": "Address list or range. Eg: '127.0.0.1', '127.0.0.1,127.0.1.1', '127.0.0.1/8'",
+ "type": Attribute.STRING,
+ "validation_function": is_addrlist,
+ }),
+ "bw_in": dict({
+ "name": "bwIn",
+ "help": "Inbound bandwidth limit (in Mbit/s)",
+ "type": Attribute.DOUBLE,
+ "validation_function": validation.is_double,
+ }),
+ "bw_out": dict({
+ "name": "bwOut",
+ "help": "Outbound bandwidth limit (in Mbit/s)",
+ "type": Attribute.DOUBLE,
+ "validation_function": validation.is_double,
+ }),
+ "plr_in": dict({
+ "name": "plrIn",
+ "help": "Inbound packet loss rate (0 = no loss, 1 = 100% loss)",
+ "type": Attribute.DOUBLE,
+ "validation_function": validation.is_double,
+ }),
+ "plr_out": dict({
+ "name": "plrOut",
+ "help": "Outbound packet loss rate (0 = no loss, 1 = 100% loss)",
+ "type": Attribute.DOUBLE,
+ "validation_function": validation.is_double,
+ }),
+ "delay_in": dict({
+ "name": "delayIn",
+ "help": "Inbound packet delay (in milliseconds)",
+ "type": Attribute.INTEGER,
+ "range": (0,60000),
+ "validation_function": validation.is_integer,
+ }),
+ "delay_out": dict({
+ "name": "delayOut",
+ "help": "Outbound packet delay (in milliseconds)",
+ "type": Attribute.INTEGER,
+ "range": (0,60000),
+ "validation_function": validation.is_integer,
+ }),
})
traces = dict({
}),
})
-create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, APPLICATION ]
+create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, APPLICATION ]
-configure_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, APPLICATION ]
+configure_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, NETPIPE, APPLICATION ]
factories_info = dict({
NODE: dict({
"min_bandwidth",
"max_bandwidth",
],
- "connector_types": ["devs", "apps"]
+ "connector_types": ["devs", "apps", "pipes"]
}),
NODEIFACE: dict({
"allow_addresses": True,
"create_function": create_internet,
"connector_types": ["devs"],
}),
+ NETPIPE: dict({
+ "help": "Link emulation",
+ "category": "topology",
+ "create_function": create_netpipe,
+ "configure_function": configure_netpipe,
+ "box_attributes": ["netpipe_mode",
+ "addr_list", "port_list",
+ "bw_in","plr_in","delay_in",
+ "bw_out","plr_out","delay_out"],
+ "connector_types": ["node"],
+ "traces": ["stdout", "stderr"]
+ }),
})
testbed_attributes = dict({
xml2 = exp_desc2.to_xml()
self.assertTrue(xml == xml2)
+ def test_design_emulation(self):
+ exp_desc, tstbd_desc, node1, node2, iface1, iface2, app = self.make_test_design()
+
+ node1.set_attribute_value("emulation",True)
+
+ netpipe1 = tstbd_desc.create("NetPipe")
+ netpipe1.set_attribute_value("mode","CLIENT")
+ netpipe1.set_attribute_value("portList","80,443")
+ netpipe1.set_attribute_value("bwIn",1.0)
+ netpipe1.set_attribute_value("bwOut",128.0/1024.0)
+ netpipe1.set_attribute_value("delayIn",12)
+ netpipe1.set_attribute_value("delayOut",92)
+ netpipe1.set_attribute_value("plrIn",0.05)
+ netpipe1.set_attribute_value("plrOut",0.15)
+ node1.connector("pipes").connect(netpipe1.connector("node"))
+
+ xml = exp_desc.to_xml()
+ exp_desc2 = ExperimentDescription()
+ exp_desc2.from_xml(xml)
+ xml2 = exp_desc2.to_xml()
+ self.assertTrue(xml == xml2)
+
if __name__ == '__main__':
unittest.main()
instance.stop()
instance.shutdown()
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_emulation(self):
+ instance = self.make_instance()
+
+ instance.defer_create(2, "Node")
+ instance.defer_create_set(2, "hostname", "onelab11.pl.sophia.inria.fr")
+ instance.defer_create_set(2, "emulation", True) # require emulation
+ instance.defer_create(3, "NodeInterface")
+ instance.defer_connect(2, "devs", 3, "node")
+ instance.defer_create(4, "Internet")
+ instance.defer_connect(3, "inet", 4, "devs")
+ instance.defer_create(7, "NetPipe")
+ instance.defer_create_set(7, "mode", "CLIENT")
+ instance.defer_create_set(7, "portList", "80")
+ instance.defer_create_set(7, "bwOut", 12.0/1024.0) # 12kbps
+ instance.defer_create_set(7, "bwIn", 64.0/1024.0) # 64kbps
+ instance.defer_create_set(7, "plrOut", 0.01) # 1% plr outbound - high loss
+ instance.defer_create_set(7, "plrIn", 0.001) # 0.1% plr inbound - regular loss
+ instance.defer_create_set(7, "delayOut", int(1500 * 8 / (12.0/1024.0) / 1000)) # tx delay at 12kbps in ms
+ instance.defer_create_set(7, "delayIn", int(1500 * 8 / (64.0/1024.0) / 1000)) # rx delay at 64kbps in ms
+ instance.defer_connect(2, "pipes", 7, "node")
+ instance.defer_create(8, "Application")
+ instance.defer_create_set(8, "command", "time wget -q -O /dev/null http://www.google.com/") # Fetch ~10kb
+ instance.defer_add_trace(8, "stderr")
+ instance.defer_connect(8, "node", 2, "apps")
+
+ instance.do_setup()
+ instance.do_create()
+ instance.do_connect()
+ instance.do_preconfigure()
+ instance.do_configure()
+
+ instance.start()
+ while instance.status(8) != STATUS_FINISHED:
+ time.sleep(0.5)
+ test_result = (instance.trace(8, "stderr") or "").strip()
+ comp_result = r".*real\s*(?P<min>[0-9]+)m(?P<sec>[0-9]+[.][0-9]+)s.*"
+
+ match = re.match(comp_result, test_result, re.MULTILINE)
+ self.assertTrue(match, "Unexpected output: %s" % (test_result,))
+
+ minutes = int(match.group("min"))
+ seconds = float(match.group("sec"))
+ self.assertTrue((minutes * 60 + seconds) > 1.0, "Emulation not effective: %s" % (test_result,))
+ instance.stop()
+ instance.shutdown()
+
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_tun_emulation_requirement(self):
instance = self.make_instance()