+class NetPipe(object):
+ def __init__(self, api=None):
+ if not api:
+ api = plcapi.PLCAPI()
+ self._api = api
+
+ # Attributes
+ self.mode = None
+ self.addrList = None
+ self.portList = None
+
+ self.plrIn = None
+ self.bwIn = None
+ self.delayIn = None
+
+ self.plrOut = None
+ self.bwOut = None
+ self.delayOut = None
+
+ # These get initialized when the pipe is connected to its node
+ self.node = None
+ self.configured = False
+
+ def validate(self):
+ if not self.mode:
+ raise RuntimeError, "Undefined NetPipe mode"
+ if not self.portList:
+ raise RuntimeError, "Undefined NetPipe port list - must always define the scope"
+ if not (self.plrIn or self.bwIn or self.delayIn):
+ raise RuntimeError, "Undefined NetPipe inbound characteristics"
+ if not (self.plrOut or self.bwOut or self.delayOut):
+ raise RuntimeError, "Undefined NetPipe outbound characteristics"
+ if not self.node:
+ raise RuntimeError, "Unconnected NetPipe"
+
+ def _add_pipedef(self, bw, plr, delay, options):
+ if delay:
+ options.extend(("delay","%dms" % (delay,)))
+ if bw:
+ options.extend(("bw","%.8fMbit/s" % (bw,)))
+ if plr:
+ options.extend(("plr","%.8f" % (plr,)))
+
+ def _get_ruledef(self):
+ scope = "%s%s%s" % (
+ self.portList,
+ "@" if self.addrList else "",
+ self.addrList or "",
+ )
+
+ options = []
+ if self.bwIn or self.plrIn or self.delayIn:
+ options.append("IN")
+ self._add_pipedef(self.bwIn, self.plrIn, self.delayIn, options)
+ if self.bwOut or self.plrOut or self.delayOut:
+ options.append("OUT")
+ self._add_pipedef(self.bwOut, self.plrOut, self.delayOut, options)
+ options = ' '.join(options)
+
+ return (scope,options)
+
+ def recover(self):
+ # Rules are safe on their nodes
+ self.configured = True
+
+ def configure(self):
+ # set up rule
+ scope, options = self._get_ruledef()
+ command = "sudo -S netconfig config %s %s %s" % (self.mode, scope, options)
+
+ (out,err),proc = server.popen_ssh_command(
+ command,
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+
+ # we have to clean up afterwards
+ self.configured = True
+
+ def refresh(self):
+ if self.configured:
+ # refresh rule
+ scope, options = self._get_ruledef()
+ command = "sudo -S netconfig refresh %s %s %s" % (self.mode, scope, options)
+
+ (out,err),proc = server.popen_ssh_command(
+ command,
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+
+ def cleanup(self):
+ if self.configured:
+ # remove rule
+ scope, options = self._get_ruledef()
+ command = "sudo -S netconfig delete %s %s" % (self.mode, scope)
+
+ (out,err),proc = server.popen_ssh_command(
+ command,
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+
+ self.configured = False
+
+ def sync_trace(self, local_dir, whichtrace):
+ if whichtrace != 'netpipeStats':
+ raise ValueError, "Unsupported trace %s" % (whichtrace,)
+
+ local_path = os.path.join(local_dir, "netpipe_stats_%s" % (self.mode,))
+
+ # create parent local folders
+ proc = subprocess.Popen(
+ ["mkdir", "-p", os.path.dirname(local_path)],
+ stdout = open("/dev/null","w"),
+ stdin = open("/dev/null","r"))
+
+ if proc.wait():
+ raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
+
+ (out,err),proc = server.popen_ssh_command(
+ "echo 'Rules:' ; sudo -S netconfig show rules ; echo 'Pipes:' ; sudo -S netconfig show pipes",
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
+
+ # dump results to file
+ f = open(local_path, "wb")
+ f.write(err or "")
+ f.write(out or "")
+ f.close()
+
+ return local_path
+
+class TunFilter(object):
+ _TRACEMAP = {
+ # tracename : (remotename, localname)
+ }
+
+ def __init__(self, api=None):
+ if not api:
+ api = plcapi.PLCAPI()
+ self._api = api
+
+ # Attributes
+ self.module = None
+ self.args = None
+
+ # These get initialised when the filter is connected
+ self.peer_guid = None
+ self.peer_proto = None
+ self.iface_guid = None
+ self.peer = None
+ self.iface = None
+
+ def _get(what, self):
+ wref = self.iface
+ if wref:
+ wref = wref()
+ if wref:
+ return getattr(wref, what)
+ else:
+ return None
+
+ def _set(what, self, val):
+ wref = self.iface
+ if wref:
+ wref = wref()
+ if wref:
+ setattr(wref, what, val)
+
+ tun_proto = property(
+ functools.partial(_get, 'tun_proto'),
+ functools.partial(_set, 'tun_proto') )
+ tun_addr = property(
+ functools.partial(_get, 'tun_addr'),
+ functools.partial(_set, 'tun_addr') )
+ tun_port = property(
+ functools.partial(_get, 'tun_port'),
+ functools.partial(_set, 'tun_port') )
+ tun_key = property(
+ functools.partial(_get, 'tun_key'),
+ functools.partial(_set, 'tun_key') )
+ tun_cipher = property(
+ functools.partial(_get, 'tun_cipher'),
+ functools.partial(_set, 'tun_cipher') )
+
+ del _get
+ del _set
+
+ def remote_trace_path(self, whichtrace):
+ iface = self.iface()
+ if iface is not None:
+ return iface.remote_trace_path(whichtrace, self._TRACEMAP)
+ return None
+
+ def remote_trace_name(self, whichtrace):
+ iface = self.iface()
+ if iface is not None:
+ return iface.remote_trace_name(whichtrace, self._TRACEMAP)
+ return None
+
+ def sync_trace(self, local_dir, whichtrace):
+ iface = self.iface()
+ if iface is not None:
+ return iface.sync_trace(local_dir, whichtrace, self._TRACEMAP)
+ return None
+
+class ClassQueueFilter(TunFilter):
+ _TRACEMAP = {
+ # tracename : (remotename, localname)
+ 'dropped_stats' : ('dropped_stats', 'dropped_stats')
+ }
+
+ def __init__(self, api=None):
+ super(ClassQueueFilter, self).__init__(api)
+ # Attributes
+ self.module = "classqueue.py"
+
+class LoggingClassQueueFilter(ClassQueueFilter):
+ _TRACEMAP = ClassQueueFilter._TRACEMAP.copy()
+ _TRACEMAP.update({
+ # tracename : (remotename, localname)
+ 'queue_stats_f' : ('queue_stats_f', 'queue_stats_f'),
+ 'queue_stats_b' : ('queue_stats_b', 'queue_stats_b'),
+ })
+
+ def __init__(self, api=None):
+ super(LoggingClassQueueFilter, self).__init__(api)
+ # Attributes
+ self.module = "loggingclassqueue.py classqueue.py"
+
+ def _args_get(self):
+ # Inject outpath
+ args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self._args or "").split(','))))
+ args["outpath"] = "queue_stats"
+ return ",".join(map("=".join, args.iteritems()))
+ def _args_set(self, value):
+ self._args = value
+ args = property(_args_get, _args_set)
+
+class ToSQueueFilter(TunFilter):
+ def __init__(self, api=None):
+ super(ToSQueueFilter, self).__init__(api)
+ # Attributes
+ self.module = "tosqueue.py"