Several small fixes to logging queue:
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 31 May 2012 06:19:23 +0000 (03:19 -0300)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 31 May 2012 06:19:23 +0000 (03:19 -0300)
 * Separate forward and backward queue logs
 * Add an "interval" so that both immediate and timed measurements are possible
 * Wake up logging thread on append when doing immediate logging
 * Properly implement the outpath-injected args attribute
 * Add missing newlines in csv log

src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/scripts/loggingclassqueue.py

index acc8c8f..dddc005 100644 (file)
@@ -582,7 +582,8 @@ class LoggingClassQueueFilter(ClassQueueFilter):
     _TRACEMAP = ClassQueueFilter._TRACEMAP.copy()
     _TRACEMAP.update({
         # tracename : (remotename, localname)
-        'queue_stats'   : ('queue_stats', 'queue_stats')
+        'queue_stats_f'   : ('queue_stats_f', 'queue_stats_f'),
+        'queue_stats_b'   : ('queue_stats_b', 'queue_stats_b'),
     })
     
     def __init__(self, api=None):
@@ -590,10 +591,14 @@ class LoggingClassQueueFilter(ClassQueueFilter):
         # Attributes
         self.module = "loggingclassqueue.py classqueue.py"
         
+    def _args_get(self):
         # Inject outpath
-        args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self.args or "").split(','))))
+        args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self._args or "").split(','))))
         args["outpath"] = "queue_stats"
-        self.args = ",".join(map("=".join, args.iteritems()))
+        return ",".join(map("=".join, args.iteritems()))
+    def _args_set(self, value):
+        self._args = value
+    args = property(_args_get, _args_set)
 
 class ToSQueueFilter(TunFilter):
     def __init__(self, api=None):
index eae9c7f..37a4c03 100644 (file)
@@ -1372,9 +1372,13 @@ traces = dict({
                 "name": "dropped_stats",
                 "help": "Information on dropped packets on a filer or queue associated to a network interface",
             }),
-    "queue_stats": dict({
-                "name": "queue_stats",
-                "help": "Detailled, fine-grained information on queue state, csv format.",
+    "queue_stats_f": dict({
+                "name": "queue_stats_f",
+                "help": "Detailled, fine-grained information on egress queue state, csv format.",
+            }),
+    "queue_stats_b": dict({
+                "name": "queue_stats_b",
+                "help": "Detailled, fine-grained information on ingress queue state, csv format.",
             }),
     })
 
@@ -1581,7 +1585,7 @@ factories_info = dict({
                 "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
             ],
             "connector_types": ["->fd","udp","tcp"],
-            "traces": ["dropped_stats","queue_stats"],
+            "traces": ["dropped_stats","queue_stats_f","queue_stats_b"],
         }),
     TOSQUEUEFILTER : dict({
             "help": "TUN classfull queue that classifies according to the TOS (RFC 791) IP field.\n\n"
index 9ad3ee9..439aa12 100644 (file)
@@ -9,6 +9,7 @@ import time
 import classqueue
 
 _outpath = "output"
+_interval = 0
 
 class QueueLogger(threading.Thread):
     def __init__(self, queues, drops, accepts, outpath):
@@ -22,17 +23,22 @@ class QueueLogger(threading.Thread):
         self._terminate = False
     
     def run(self):
+        if _interval > 0:
+            interval = _interval
+        else:
+            interval = 1
+        
         t0 = time.time()
-        with open(self.outpath, "w+") as outfile:
-            outfile.write(",".join(
+        with open(self.outpath, "w") as outfile:
+            outfile.writelines((",".join(
                 ["time"]
                 + map("q%02dlen".__mod__, xrange(len(self.queues)))
                 + map("q%02ddrops".__mod__, xrange(len(self.queues)))
                 + map("q%02daccepts".__mod__, xrange(len(self.queues)))
-            ))
+            ), "\n"))
             
             while not self._terminate:
-                self._event.wait(1)
+                self._event.wait(interval)
                 if self._terminate:
                     break
                 
@@ -46,12 +52,12 @@ class QueueLogger(threading.Thread):
                 queueaccepts = list(self.accepts)
                 self.accepts[:] = [0] * len(self.accepts)
                 
-                outfile.write(",".join(
-                    [str(time.time)]
+                outfile.writelines((",".join(
+                    [str(time.time()-t0)]
                     + map(str, queuelens)
                     + map(str, queuedrops)
                     + map(str, queueaccepts)
-                ))
+                ), "\n"))
                 outfile.flush()
             
     def terminate(self):
@@ -62,13 +68,15 @@ class QueueLogger(threading.Thread):
         self._event.set()
 
 class LoggingClassQueue(classqueue.ClassQueue):
+    outpath_suffix = itertools.cycle(('_f','_b'))
+    
     def __init__(self):
         self.accepts = []
         self.drops = []
         super(LoggingClassQueue, self).__init__()
         
         # Prepare logger thread
-        self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath)
+        self.logger = QueueLogger(self.queues, self.drops, self.accepts, _outpath+self.outpath_suffix.next())
         self.logger.start()
     
     def __del__(self):
@@ -88,11 +96,15 @@ class LoggingClassQueue(classqueue.ClassQueue):
         else:
             self.accepts[qi] += 1
         
+        if _interval == 0:
+            self.logger.wakeup()
+        
         return dropped
 
 queueclass = LoggingClassQueue
 
-def init(outpath="output", **kw):
-    global _outpath
+def init(outpath="output", interval=0, **kw):
+    global _outpath, _interval
     _outpath = outpath
+    _interval = interval
     classqueue.init(**kw)