README moves to markdown
[nepi.git] / src / nepi / util / parallel.py
index b7caeac..4b1acc8 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
 #
 
 import threading
-import Queue
 import traceback
 import sys
 import os
 
+from six.moves import queue
+
 N_PROCS = None
 
 class WorkerThread(threading.Thread):
@@ -65,12 +65,12 @@ class ParallelRun(object):
         self.maxqueue = maxqueue
         self.maxthreads = maxthreads
         
-        self.queue = Queue.Queue(self.maxqueue or 0)
+        self.queue = queue.Queue(self.maxqueue or 0)
         
         self.delayed_exceptions = []
         
         if results:
-            self.rvqueue = Queue.Queue()
+            self.rvqueue = queue.Queue()
         else:
             self.rvqueue = None
     
@@ -85,11 +85,8 @@ class ParallelRun(object):
         if maxthreads is None:
             if N_PROCS is None:
                 try:
-                    f = open("/proc/cpuinfo")
-                    try:
+                    with open("/proc/cpuinfo") as f:
                         N_PROCS = sum("processor" in l for l in f)
-                    finally:
-                        f.close()
                 except:
                     pass
             maxthreads = N_PROCS
@@ -100,7 +97,7 @@ class ParallelRun(object):
         self.workers = []
 
         # initialize workers
-        for x in xrange(maxthreads):
+        for x in range(maxthreads):
             worker = WorkerThread()
             worker.attach(self.queue, self.rvqueue, self.delayed_exceptions)
             worker.setDaemon(True)
@@ -115,7 +112,7 @@ class ParallelRun(object):
             try:
                 self.queue.get(block = False)
                 self.queue.task_done()
-            except Queue.Empty:
+            except queue.Empty:
                 break
   
     def destroy(self):
@@ -148,17 +145,17 @@ class ParallelRun(object):
         if self.delayed_exceptions:
             typ,val,loc = self.delayed_exceptions[0]
             del self.delayed_exceptions[:]
-            raise typ,val,loc
+            raise typ(val).with_traceback(loc)
         
     def __iter__(self):
         if self.rvqueue is not None:
             while True:
                 try:
                     yield self.rvqueue.get_nowait()
-                except Queue.Empty:
+                except queue.Empty:
                     self.queue.join()
                     try:
                         yield self.rvqueue.get_nowait()
-                    except Queue.Empty:
+                    except queue.Empty:
                         raise StopIteration