Source code for txpostgres.retrying

"""
Simple implementation of a retrying call.

This code is based on a snippet send to the twisted-python mailing list:

http://twistedmatrix.com/pipermail/twisted-python/2009-November/020818.html

as well as published as txretry:

https://github.com/fluidinfo/txretry

It has been modified to allow resetting the backoff iterator and not store a
list of past failures.
"""

import random

from twisted.internet import reactor, defer, task
from twisted.python import failure


[docs]def simpleBackoffIterator(initialDelay=1.0, maxDelay=3600, factor=2.7182818284590451, jitter=0.11962656472, maxRetries=10, now=True): """ Yields increasing timeout values between retries of a call. The default factor and jitter are taken from Twisted's :tm:`ReconnectingClientFactory <internet.protocol.ReconnectingClientFactory>`. :var initialDelay: Initial delay, in seconds. :vartype initialDelay: :class:`float` :var maxDelay: Maximum cap for the delay, if zero then no maximum is applied. :vartype maxDelay: :class:`float` :var factor: Multiplicative factor for increasing the delay. :vartype factor: :class:`float` :var jitter: Randomness factor to include when increasing the delay, to prevent stampeding. :vartype jitter: :class:`float` :var maxRetries: If non-zero, only yield so many values after exhausting the iterator. :vartype maxRetries: :class:`int` :var now: If the very first delay yielded should always be zero. :vartype now: :class:`bool` """ retries = 0 delay = initialDelay if now: retries += 1 yield 0.0 while not maxRetries or retries < maxRetries: retries += 1 delay = delay * factor if jitter: delay = random.normalvariate(delay, delay * jitter) if maxDelay: delay = min(delay, maxDelay) yield delay
[docs]class RetryingCall(object): """ Calls a function repeatedly, passing it args and keyword args. Failures are passed to a user-supplied failure testing function. If the failure is ignored, the function is called again after a delay whose duration is obtained from a user-supplied iterator. The start method (below) returns a :d:`Deferred` that fires with the eventual non-error result of calling the supplied function, or fires its errback if no successful result can be obtained before the delay backoff iterator raises :class:`StopIteration`. It is important to note the behaviour when the delay of any of the steps is zero. The function is the called synchronously, ie. control does not go back to the reactor between obtaining the delay from the iterator and calling the function if the iterator returns zero. The :meth:`.resetBackoff` method replaces the backoff iterator with another one and is useful to reset the delay if some phase of the process has succeeded and that makes the desirable initial delay different again. """ reactor = None def __init__(self, f, *args, **kw): if self.reactor is None: self.reactor = reactor self._f = f self._args = args self._kw = kw def _err(self, fail): if self.failure is None: self.failure = fail try: if not self.cancelled: fail = self._failureTester(fail) except: self._deferred.errback() else: if isinstance(fail, failure.Failure): self._deferred.errback(fail) else: self._call() def _call(self): try: delay = next(self._backoffIterator) except StopIteration: self._deferred.errback(self.failure) else: self._callWithDelay(delay) def _callWithDelay(self, delay): # if the delay is 0, call the function synchronously if not delay: self._inProgress = defer.maybeDeferred( self._f, *self._args, **self._kw) else: self._inProgress = task.deferLater( self.reactor, delay, self._f, *self._args, **self._kw) self._inProgress.addCallbacks(self._deferred.callback, self._err) def _cancel(self, d): self.cancelled = True self._inProgress.cancel()
[docs] def start(self, backoffIterator=None, failureTester=None): """ Start the call and retry it until it succeeds and fails. :param backoffIterator: A zero-argument callable that should return a iterator yielding reconnection delay periods. If :class:`None` then :func:`.simpleBackoffIterator` will be used. :type backoffIterator: callable :param failureTester: A one-argument callable that will be called with a :tm:`Failure <python.failure.Failure>` instance each time the function being retried fails. It should return :class:`None` if the call should be retried or a :tm:`Failure <python.failure.Failure>` if the retrying process should be stopped. If :class:`None` is used for this parameter, retrying will never stop until the backoff iterator is exhausted. :type failureTester: callable """ self.resetBackoff(backoffIterator) if failureTester is None: failureTester = lambda _: None self._failureTester = failureTester self._deferred = defer.Deferred(self._cancel) self._inProgress = None self.failure = None self.cancelled = False self._call() return self._deferred
[docs] def resetBackoff(self, backoffIterator=None): """ Replace the current backoff iterator with a new one. """ if backoffIterator is None: backoffIterator = simpleBackoffIterator() self._backoffIterator = iter(backoffIterator)