From dfeb7c0c9a41d5063e12def3e801c6e2d2840621 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Tue, 27 Sep 2011 04:00:59 +0200 Subject: [PATCH] Make multicall threadsafe --- src/nepi/testbeds/planetlab/plcapi.py | 65 ++++++++++++++------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/src/nepi/testbeds/planetlab/plcapi.py b/src/nepi/testbeds/planetlab/plcapi.py index 4e6b89dd..c4aa111c 100644 --- a/src/nepi/testbeds/planetlab/plcapi.py +++ b/src/nepi/testbeds/planetlab/plcapi.py @@ -2,6 +2,7 @@ import xmlrpclib import functools import socket import time +import threading def _retry(fn): def rv(*p, **kw): @@ -82,18 +83,29 @@ class PLCAPI(object): self.auth = dict(AuthMethod='anonymous') self._localPeerName = localPeerName + self._url = urlpattern % {'hostname':hostname} - self.api = xmlrpclib.ServerProxy( - urlpattern % {'hostname':hostname}, + self.threadlocal = threading.local() + + @property + def api(self): + # Cannot reuse same proxy in all threads, py2.7 is not threadsafe + return xmlrpclib.ServerProxy( + self._url , allow_none = True) - self._multi = False + @property + def mcapi(self): + try: + return self.threadlocal.mc + except AttributeError: + return self.api def test(self): import warnings # validate XMLRPC server checking supported API calls - methods = set(_retry(self.api.system.listMethods)()) + methods = set(_retry(self.mcapi.system.listMethods)()) if self._required_methods - methods: warnings.warn("Unsupported REQUIRED methods: %s" % ( ", ".join(sorted(self._required_methods - methods)), ) ) return False @@ -102,7 +114,7 @@ class PLCAPI(object): try: # test authorization - network_types = _retry(self.api.GetNetworkTypes)(self.auth) + network_types = _retry(self.mcapi.GetNetworkTypes)(self.auth) except (xmlrpclib.ProtocolError, xmlrpclib.Fault),e: warnings.warn(str(e)) @@ -114,7 +126,7 @@ class PLCAPI(object): try: return self._network_types except AttributeError: - self._network_types = _retry(self.api.GetNetworkTypes)(self.auth) + self._network_types = _retry(self.mcapi.GetNetworkTypes)(self.auth) return self._network_types @property @@ -122,7 +134,7 @@ class PLCAPI(object): try: return self._peer_map except AttributeError: - peers = _retry(self.api.GetPeers)(self.auth, {}, ['shortname','peername','peer_id']) + peers = _retry(self.mcapi.GetPeers)(self.auth, {}, ['shortname','peername','peer_id']) self._peer_map = dict( (peer['shortname'], peer['peer_id']) for peer in peers @@ -162,7 +174,7 @@ class PLCAPI(object): """ if not isinstance(node, (str, int, long)): raise ValueError, "Node must be either a non-unicode string or an int" - return _retry(self.api.GetNodeFlavour)(self.auth, node) + return _retry(self.mcapi.GetNodeFlavour)(self.auth, node) def GetNodes(self, nodeIdOrName=None, fields=None, **kw): """ @@ -193,7 +205,7 @@ class PLCAPI(object): else: fieldstuple = () if nodeIdOrName is not None: - return _retry(self.api.GetNodes)(self.auth, nodeIdOrName, *fieldstuple) + return _retry(self.mcapi.GetNodes)(self.auth, nodeIdOrName, *fieldstuple) else: filters = kw.pop('filters',{}) @@ -224,7 +236,7 @@ class PLCAPI(object): filters['peer_id'] = peer_filter filters.update(kw) - return _retry(self.api.GetNodes)(self.auth, filters, *fieldstuple) + return _retry(self.mcapi.GetNodes)(self.auth, filters, *fieldstuple) def GetNodeTags(self, nodeTagId=None, fields=None, **kw): if fields is not None: @@ -232,11 +244,11 @@ class PLCAPI(object): else: fieldstuple = () if nodeTagId is not None: - return _retry(self.api.GetNodeTags)(self.auth, nodeTagId, *fieldstuple) + return _retry(self.mcapi.GetNodeTags)(self.auth, nodeTagId, *fieldstuple) else: filters = kw.pop('filters',{}) filters.update(kw) - return _retry(self.api.GetNodeTags)(self.auth, filters, *fieldstuple) + return _retry(self.mcapi.GetNodeTags)(self.auth, filters, *fieldstuple) def GetSliceTags(self, sliceTagId=None, fields=None, **kw): if fields is not None: @@ -244,11 +256,11 @@ class PLCAPI(object): else: fieldstuple = () if sliceTagId is not None: - return _retry(self.api.GetSliceTags)(self.auth, sliceTagId, *fieldstuple) + return _retry(self.mcapi.GetSliceTags)(self.auth, sliceTagId, *fieldstuple) else: filters = kw.pop('filters',{}) filters.update(kw) - return _retry(self.api.GetSliceTags)(self.auth, filters, *fieldstuple) + return _retry(self.mcapi.GetSliceTags)(self.auth, filters, *fieldstuple) def GetInterfaces(self, interfaceIdOrIp=None, fields=None, **kw): @@ -257,11 +269,11 @@ class PLCAPI(object): else: fieldstuple = () if interfaceIdOrIp is not None: - return _retry(self.api.GetInterfaces)(self.auth, interfaceIdOrIp, *fieldstuple) + return _retry(self.mcapi.GetInterfaces)(self.auth, interfaceIdOrIp, *fieldstuple) else: filters = kw.pop('filters',{}) filters.update(kw) - return _retry(self.api.GetInterfaces)(self.auth, filters, *fieldstuple) + return _retry(self.mcapi.GetInterfaces)(self.auth, filters, *fieldstuple) def GetSlices(self, sliceIdOrName=None, fields=None, **kw): if fields is not None: @@ -269,27 +281,20 @@ class PLCAPI(object): else: fieldstuple = () if sliceIdOrName is not None: - return _retry(self.api.GetSlices)(self.auth, sliceIdOrName, *fieldstuple) + return _retry(self.mcapi.GetSlices)(self.auth, sliceIdOrName, *fieldstuple) else: filters = kw.pop('filters',{}) filters.update(kw) - return _retry(self.api.GetSlices)(self.auth, filters, *fieldstuple) + return _retry(self.mcapi.GetSlices)(self.auth, filters, *fieldstuple) def UpdateSlice(self, sliceIdOrName, **kw): - return _retry(self.api.UpdateSlice)(self.auth, sliceIdOrName, kw) + return _retry(self.mcapi.UpdateSlice)(self.auth, sliceIdOrName, kw) def StartMulticall(self): - if not self._multi: - self._api = self.api - self.api = xmlrpclib.MultiCall(self._api) - self._multi = True + self.threadlocal.mc = xmlrpclib.MultiCall(self.mcapi) def FinishMulticall(self): - if self._multi: - rv = self.api() - self.api = self._api - self._multi = False - return rv - else: - return [] + mc = self.threadlocal.mc + del self.threadlocal.mc + return _retry(mc)() -- 2.45.2