--- /dev/null
+from manifold.operators import LAST_RECORD
+import threading
+
+#------------------------------------------------------------------
+# Class callback
+#------------------------------------------------------------------
+
+class Callback:
+ def __init__(self, deferred=None, router=None, cache_id=None):
+ #def __init__(self, deferred=None, event=None, router=None, cache_id=None):
+ self.results = []
+ self._deferred = deferred
+
+ #if not self.event:
+ self.event = threading.Event()
+ #else:
+ # self.event = event
+
+ # Used for caching...
+ self.router = router
+ self.cache_id = cache_id
+
+ def __call__(self, value):
+ # End of the list of records sent by Gateway
+ if value == LAST_RECORD:
+ if self.cache_id:
+ # Add query results to cache (expires in 30min)
+ #print "Result added to cached under id", self.cache_id
+ self.router.cache[self.cache_id] = (self.results, time.time() + CACHE_LIFETIME)
+
+ if self._deferred:
+ # Send results back using deferred object
+ self._deferred.callback(self.results)
+ else:
+ # Not using deferred, trigger the event to return results
+ self.event.set()
+ return self.event
+
+ # Not LAST_RECORD add the value to the results
+ self.results.append(value)
+
+ def wait(self):
+ self.event.wait()
+ self.event.clear()
+
+ def get_results(self):
+ self.wait()
+ return self.results
+