Make multicall threadsafe
[nepi.git] / src / nepi / testbeds / planetlab / plcapi.py
index 4e6b89d..c4aa111 100644 (file)
@@ -2,6 +2,7 @@ import xmlrpclib
 import functools
 import socket
 import time
+import threading
 
 def _retry(fn):
     def rv(*p, **kw):
@@ -82,18 +83,29 @@ class PLCAPI(object):
             self.auth = dict(AuthMethod='anonymous')
         
         self._localPeerName = localPeerName
+        self._url = urlpattern % {'hostname':hostname}
         
-        self.api = xmlrpclib.ServerProxy(
-            urlpattern % {'hostname':hostname},
+        self.threadlocal = threading.local()
+    
+    @property
+    def api(self):
+        # Cannot reuse same proxy in all threads, py2.7 is not threadsafe
+        return xmlrpclib.ServerProxy(
+            self._url ,
             allow_none = True)
         
-        self._multi = False
+    @property
+    def mcapi(self):
+        try:
+            return self.threadlocal.mc
+        except AttributeError:
+            return self.api
         
     def test(self):
         import warnings
         
         # validate XMLRPC server checking supported API calls
-        methods = set(_retry(self.api.system.listMethods)())
+        methods = set(_retry(self.mcapi.system.listMethods)())
         if self._required_methods - methods:
             warnings.warn("Unsupported REQUIRED methods: %s" % ( ", ".join(sorted(self._required_methods - methods)), ) )
             return False
@@ -102,7 +114,7 @@ class PLCAPI(object):
         
         try:
             # test authorization
-            network_types = _retry(self.api.GetNetworkTypes)(self.auth)
+            network_types = _retry(self.mcapi.GetNetworkTypes)(self.auth)
         except (xmlrpclib.ProtocolError, xmlrpclib.Fault),e:
             warnings.warn(str(e))
         
@@ -114,7 +126,7 @@ class PLCAPI(object):
         try:
             return self._network_types
         except AttributeError:
-            self._network_types = _retry(self.api.GetNetworkTypes)(self.auth)
+            self._network_types = _retry(self.mcapi.GetNetworkTypes)(self.auth)
             return self._network_types
     
     @property
@@ -122,7 +134,7 @@ class PLCAPI(object):
         try:
             return self._peer_map
         except AttributeError:
-            peers = _retry(self.api.GetPeers)(self.auth, {}, ['shortname','peername','peer_id'])
+            peers = _retry(self.mcapi.GetPeers)(self.auth, {}, ['shortname','peername','peer_id'])
             self._peer_map = dict(
                 (peer['shortname'], peer['peer_id'])
                 for peer in peers
@@ -162,7 +174,7 @@ class PLCAPI(object):
         """
         if not isinstance(node, (str, int, long)):
             raise ValueError, "Node must be either a non-unicode string or an int"
-        return _retry(self.api.GetNodeFlavour)(self.auth, node)
+        return _retry(self.mcapi.GetNodeFlavour)(self.auth, node)
     
     def GetNodes(self, nodeIdOrName=None, fields=None, **kw):
         """
@@ -193,7 +205,7 @@ class PLCAPI(object):
         else:
             fieldstuple = ()
         if nodeIdOrName is not None:
-            return _retry(self.api.GetNodes)(self.auth, nodeIdOrName, *fieldstuple)
+            return _retry(self.mcapi.GetNodes)(self.auth, nodeIdOrName, *fieldstuple)
         else:
             filters = kw.pop('filters',{})
             
@@ -224,7 +236,7 @@ class PLCAPI(object):
                 filters['peer_id'] = peer_filter
             
             filters.update(kw)
-            return _retry(self.api.GetNodes)(self.auth, filters, *fieldstuple)
+            return _retry(self.mcapi.GetNodes)(self.auth, filters, *fieldstuple)
     
     def GetNodeTags(self, nodeTagId=None, fields=None, **kw):
         if fields is not None:
@@ -232,11 +244,11 @@ class PLCAPI(object):
         else:
             fieldstuple = ()
         if nodeTagId is not None:
-            return _retry(self.api.GetNodeTags)(self.auth, nodeTagId, *fieldstuple)
+            return _retry(self.mcapi.GetNodeTags)(self.auth, nodeTagId, *fieldstuple)
         else:
             filters = kw.pop('filters',{})
             filters.update(kw)
-            return _retry(self.api.GetNodeTags)(self.auth, filters, *fieldstuple)
+            return _retry(self.mcapi.GetNodeTags)(self.auth, filters, *fieldstuple)
 
     def GetSliceTags(self, sliceTagId=None, fields=None, **kw):
         if fields is not None:
@@ -244,11 +256,11 @@ class PLCAPI(object):
         else:
             fieldstuple = ()
         if sliceTagId is not None:
-            return _retry(self.api.GetSliceTags)(self.auth, sliceTagId, *fieldstuple)
+            return _retry(self.mcapi.GetSliceTags)(self.auth, sliceTagId, *fieldstuple)
         else:
             filters = kw.pop('filters',{})
             filters.update(kw)
-            return _retry(self.api.GetSliceTags)(self.auth, filters, *fieldstuple)
+            return _retry(self.mcapi.GetSliceTags)(self.auth, filters, *fieldstuple)
         
     
     def GetInterfaces(self, interfaceIdOrIp=None, fields=None, **kw):
@@ -257,11 +269,11 @@ class PLCAPI(object):
         else:
             fieldstuple = ()
         if interfaceIdOrIp is not None:
-            return _retry(self.api.GetInterfaces)(self.auth, interfaceIdOrIp, *fieldstuple)
+            return _retry(self.mcapi.GetInterfaces)(self.auth, interfaceIdOrIp, *fieldstuple)
         else:
             filters = kw.pop('filters',{})
             filters.update(kw)
-            return _retry(self.api.GetInterfaces)(self.auth, filters, *fieldstuple)
+            return _retry(self.mcapi.GetInterfaces)(self.auth, filters, *fieldstuple)
         
     def GetSlices(self, sliceIdOrName=None, fields=None, **kw):
         if fields is not None:
@@ -269,27 +281,20 @@ class PLCAPI(object):
         else:
             fieldstuple = ()
         if sliceIdOrName is not None:
-            return _retry(self.api.GetSlices)(self.auth, sliceIdOrName, *fieldstuple)
+            return _retry(self.mcapi.GetSlices)(self.auth, sliceIdOrName, *fieldstuple)
         else:
             filters = kw.pop('filters',{})
             filters.update(kw)
-            return _retry(self.api.GetSlices)(self.auth, filters, *fieldstuple)
+            return _retry(self.mcapi.GetSlices)(self.auth, filters, *fieldstuple)
         
     def UpdateSlice(self, sliceIdOrName, **kw):
-        return _retry(self.api.UpdateSlice)(self.auth, sliceIdOrName, kw)
+        return _retry(self.mcapi.UpdateSlice)(self.auth, sliceIdOrName, kw)
         
 
     def StartMulticall(self):
-        if not self._multi:
-            self._api = self.api
-            self.api = xmlrpclib.MultiCall(self._api)
-            self._multi = True
+        self.threadlocal.mc = xmlrpclib.MultiCall(self.mcapi)
     
     def FinishMulticall(self):
-        if self._multi:
-            rv = self.api()
-            self.api = self._api
-            self._multi = False
-            return rv
-        else:
-            return []
+        mc = self.threadlocal.mc
+        del self.threadlocal.mc
+        return _retry(mc)()