From: Arun Babu Neelicattu <abn(a)redhat.com>
- if login provides a token, reuse it for all subsequent requests sent
to that domain
- changes are transparent to user
- persist tokens in ~/.bugzillatoken
- ensure test cases are aware of tokens
---
bugzilla/base.py | 60 +++++++++++++++++++++++++++++++++++++++++++++++++-
tests/rw_functional.py | 16 ++++++++++----
2 files changed, 71 insertions(+), 5 deletions(-)
diff --git a/bugzilla/base.py b/bugzilla/base.py
index f69abd5..a1cc239 100644
--- a/bugzilla/base.py
+++ b/bugzilla/base.py
@@ -105,6 +105,64 @@ def _build_cookiejar(cookiefile):
return retcj
+class BugzillaToken(object):
+
+ def __init__(self, uri, tokenfile=None):
+ self.tokenfilename = tokenfile if tokenfile is not None else \
+ os.path.expanduser('~/.bugzillatoken')
+ self.tokenfile = SafeConfigParser()
+ self.tokenfile.read(self.tokenfilename)
+
+ self.domain = urlparse(uri)[1]
+ if self.domain not in self.tokenfile.sections():
+ self.tokenfile.add_section(self.domain)
+
+ @property
+ def value(self):
+ if self.tokenfile.has_option(self.domain, 'token'):
+ return self.tokenfile.get(self.domain, 'token')
+ else:
+ return None
+
+ @value.setter
+ def value(self, value):
+ if self.value != value:
+ if value is None:
+ self.tokenfile.remove_option(self.domain, 'token')
+ else:
+ self.tokenfile.set(self.domain, 'token', value)
+
+ with open(self.tokenfilename, 'wb') as tokenfile:
+ self.tokenfile.write(tokenfile)
+
+ def __eq__(self, other):
+ if isinstance(BugzillaToken, other):
+ return self.value == other.value
+ return self.value == other
+
+ def __repr__(self):
+ return '<Bugzilla Token :: %s>' % (self.value)
+
+
+class BugzillaServerProxy(ServerProxy):
+
+ def __init__(self, uri, *args, **kwargs):
+ ServerProxy.__init__(self, uri, *args, **kwargs)
+ self.token = BugzillaToken(uri)
+
+ def clear_token(self):
+ self.token.value = None
+
+ def _ServerProxy__request(self, methodname, params):
+ if self.token.value is not None \
+ and len(params) == 1 and 'token' not in params[0]:
+ params[0]['token'] = self.token.value
+ ret = ServerProxy._ServerProxy__request(self, methodname, params)
+ if isinstance(ret, dict) and 'token' in ret.keys():
+ self.token.value = ret.get('token')
+ return ret
+
+
class RequestsTransport(Transport):
user_agent = 'Python/Bugzilla'
@@ -514,7 +572,7 @@ class BugzillaBase(object):
self._transport = RequestsTransport(
url, self._cookiejar, sslverify=self._sslverify)
self._transport.user_agent = self.user_agent
- self._proxy = ServerProxy(url, self._transport)
+ self._proxy = BugzillaServerProxy(url, self._transport)
self.url = url
diff --git a/tests/rw_functional.py b/tests/rw_functional.py
index 045c832..7e0a9aa 100644
--- a/tests/rw_functional.py
+++ b/tests/rw_functional.py
@@ -25,10 +25,12 @@ else:
import bugzilla
from bugzilla import Bugzilla
+from bugzilla.base import BugzillaToken
import tests
cf = os.path.expanduser("~/.bugzillacookies")
+tf = os.path.expanduser("~/.bugzillatoken")
def _split_int(s):
@@ -43,7 +45,7 @@ class BaseTest(unittest.TestCase):
bz = Bugzilla(url=self.url, cookiefile=None)
self.assertTrue(bz.__class__ is self.bzclass)
- def _testCookie(self):
+ def _testCookieOrToken(self):
cookiefile = cf
domain = urlparse(self.url)[1]
if os.path.exists(cookiefile):
@@ -51,8 +53,14 @@ class BaseTest(unittest.TestCase):
if domain in out:
return
- raise RuntimeError("%s must exist and contain domain '%s'" %
- (cookiefile, domain))
+ tokenfile = tf
+ if os.path.exists(tokenfile):
+ token = BugzillaToken(self.url)
+ if token.value is not None:
+ return
+
+ raise RuntimeError("%s or %s must exist and contain domain '%s'" %
+ (cookiefile, tokenfile, domain))
class RHPartnerTest(BaseTest):
@@ -80,7 +88,7 @@ class RHPartnerTest(BaseTest):
return not noprivs
- test1 = BaseTest._testCookie
+ test1 = BaseTest._testCookieOrToken
test2 = BaseTest._testBZClass
--
1.9.0