Source code for txpostgres.reconnection

Reconnection support for txpostgres.
from __future__ import absolute_import

from twisted.internet import defer
from twisted.python import log

from txpostgres import retrying
from txpostgres.psycopg2_impl import psycopg2
from txpostgres.txpostgres import RollbackFailed

[docs]class ConnectionDead(Exception): """ The connection is dead. """
[docs]def defaultDeathChecker(f): """ Checker function suitable for use with :class:`.DeadConnectionDetector`. """ return f.check(psycopg2.InterfaceError, psycopg2.OperationalError, RollbackFailed)
[docs]def defaultReconnectionIterator(): """ A function returning sane defaults for a reconnection iterator, for use with :class:`.DeadConnectionDetector`. The defaults have maximum reconnection delay capped at 10 seconds and no limit on the number of retries. """ return retrying.simpleBackoffIterator( initialDelay=1.0, maxDelay=10.0, factor=1.7, maxRetries=0, now=True)
[docs]class DeadConnectionDetector(object): """ A class implementing reconnection strategy. When the connection is discovered to be dead, it will start the reconnection process. The object being reconnected should proxy all operations through the detector's :meth:`.callChecking` which will automatically fail them if the connection is currently dead. This is done to prevent sending requests to a resource that's not currently available. When an instance of :class:`~txpostgres.txpostgres.Connection` is passed a :class:`.DeadConnectionDetector` it automatically starts using it to provide reconnection. Another way of using this class is manually calling :meth:`.checkForDeadConnection` passing it a :tm:`Failure <python.failure.Failure>` instance to trigger reconnection. This is useful to handle initial connection errors, for example:: conn = txpostgres.Connection(detector=DeadConnectionDetector()) d = conn.connect('dbname=test') d.addErrback(conn.detector.checkForDeadConnection) :var reconnectable: An object to be reconnected. It should provide a `connect` and a `close` method. :vartype reconnectable: :class:`object` :var connectionIsDead: If the connection is currently believed to be dead. :vartype connectionIsDead: :class:`bool` """ reconnectable = None connectionIsDead = None def __init__(self, deathChecker=None, reconnectionIterator=None, reactor=None): """ Create a new detector. :param deathChecker: A one-argument callable that will be called with a failure instance and should return True if reconnection should be triggered. If :class:`None` then :func:`.defaultDeathChecker` will be used. :type deathChecker: callable :param reconnectionIterator: A zero-argument callable that should return a iterator yielding reconnection delay periods. If :class:`None` then :func:`.defaultReconnectionIterator` will be used. :type reconnectionIterator: callable :param reactor: A Twisted reactor or :class:`None`, which means the current reactor. """ self.deathChecker = deathChecker or defaultDeathChecker self.reconnectionIterator = (reconnectionIterator or defaultReconnectionIterator) if not reactor: from twisted.internet import reactor self.reactor = reactor self.connectionIsDead = False self._recoveryHandlers = set()
[docs] def setReconnectable(self, reconnectable, *connargs, **connkw): """ Register a reconnectable with the detector. Needs to be called before the detector will be used. The remaining arguments will be passed to the reconnectable's `connect` method on each reconnection. :param reconnectable: An object to be reconnected. It should provide a `connect` and a `close` method. :type reconnectable: :class:`object` """ self.reconnectable = reconnectable self._connargs = connargs self._connkw = connkw
[docs] def callChecking(self, method, *args, **kwargs): """ Call a method if the connection is still alive. """ # the connection is already dead and a reconnect is underway if self.connectionIsDead: return # call the method and check if the connection died d = defer.maybeDeferred(method, *args, **kwargs) return d.addErrback(self.checkForDeadConnection)
[docs] def checkForDeadConnection(self, f): """ Get passed a :tm:`Failure <python.failure.Failure>` instance and determine if it means that the connection is dead. If so, start reconnecting. """ # if the error does not indicate that the connection is dead, just # return the failure if not self.deathChecker(f): return f # if we already know that the connection is dead, we just need to wait if self.connectionIsDead: return f # we detected that the connection died, start the reconnection process self.connectionIsDead = True self.startReconnecting(f) # return the original failure, we never want to swallow errors return f
[docs] def startReconnecting(self, f): """ Called when the connection is detected to be dead. """ # set up a retrying reconnecting call and start it rc = retrying.RetryingCall(self.reconnect) rc.reactor = self.reactor d = rc.start(self.reconnectionIterator()) d.addCallback(lambda _: self.connectionRecovered()) # the reconnection should never fail (it doesn't with the default # iterator), but buggy recovery handlers and custom iterators might # cause that, so just log the error and swallow it d.addErrback(log.err)
[docs] def reconnect(self): """ Called on each attempt of reconnection. """ # if the connection is down even closing it might cause error, but # then they should be safe to ignore (probably it's already closed) try: self.reconnectable.close() except: pass # reuse the stored connection arguments return self.reconnectable.connect( *self._connargs, **self._connkw)
[docs] def connectionRecovered(self): """ Called when the connection has recovered. """ self.connectionIsDead = False dl = [] for handler in self.getRecoveryHandlers(): d = defer.maybeDeferred(handler) d.addErrback(log.err) dl.append(d) return defer.gatherResults(dl)
[docs] def addRecoveryHandler(self, handler): """ Add a handler function that will get called whenever the connection is recovered. Any number of handlers can be added. Adding a handler that's already been added is ignored. Recovery handlers are ran in parallel. If any of them return a :d:`Deferred`, recovery will wait until it fires. There are no guarantees as to the order in which handler functions are called. Exceptions in handlers are logged and discarded. :param handler: A zero-argument callable. """ self._recoveryHandlers.add(handler)
[docs] def removeRecoveryHandler(self, handler): """ Remove a previously added recovery handler. Removing a handler that's never been added will be ignored. :param handler: A callable that should no longer be called when the connection recovers. """ self._recoveryHandlers.discard(handler)
[docs] def getRecoveryHandlers(self): """ Get the currently registered recovery handlers. :return: A set of callables that will get called on recovery. :rtype: :class:`set` """ return set(self._recoveryHandlers)