API documentation

All txpostgres APIs are documented here.

txpostgres.txpostgres

class txpostgres.txpostgres.Connection(reactor=None, cooperator=None, detector=None)[source]

Bases: txpostgres.txpostgres._PollingMixin

A wrapper for a psycopg2 asynchronous connection.

The wrapper forwards almost everything to the wrapped connection, but provides additional methods for compatibility with adbapi.Connection.

Parameters:
  • reactor – A Twisted reactor or None, which means the current reactor
  • cooperator – A Twisted Cooperator to process NOTIFY events or None, which means using task.cooperate
Variables:
  • connectionFactory (any callable) – The factory used to produce connections, defaults to psycopg2.connect
  • cursorFactory (a callable accepting two positional arguments, a psycopg2.cursor and a Connection) – The factory used to produce cursors, defaults to Cursor
connect(*args, **kwargs)[source]

Connect to the database.

Any arguments will be passed to connectionFactory. Use them to pass database names, usernames, passwords, etc.

Returns:A Deferred that will fire when the connection is open.
Raise:AlreadyConnected when the connection has already been opened.
close()[source]

Close the connection and disconnect from the database.

Returns:None
cursor()[source]

Create an asynchronous cursor using cursorFactory.

runQuery(*args, **kwargs)[source]

Execute an SQL query and return the result.

An asynchronous cursor will be created and its execute() method will be invoked with the provided arguments. After the query completes the results will be fetched and the returned Deferred will fire with the result.

The connection is always in autocommit mode, so the query will be run in a one-off transaction. In case of errors a Failure will be returned.

It is safe to call this method multiple times without waiting for the first query to complete.

Returns:A Deferred that will fire with the return value of the cursor’s fetchall() method.
runOperation(*args, **kwargs)[source]

Execute an SQL query and discard the result.

Identical to runQuery(), but the result won’t be fetched and instead None will be returned. It is intended for statements that do not normally return values, like INSERT or DELETE.

It is safe to call this method multiple times without waiting for the first query to complete.

Returns:A Deferred that will fire None.
runInteraction(interaction, *args, **kwargs)[source]

Run commands in a transaction and return the result.

interaction should be a callable that will be passed a Cursor object. Before calling interaction a new transaction will be started, so the callable can assume to be running all its commands in a transaction. If interaction returns a Deferred processing will wait for it to fire before proceeding. You should not close the provided Cursor.

After interaction finishes work the transaction will be automatically committed. If it raises an exception or returns a Failure the connection will be rolled back instead.

If committing the transaction fails it will be rolled back instead and the failure obtained trying to commit will be returned.

If rolling back the transaction fails the failure obtained from the rollback attempt will be logged and a RollbackFailed failure will be returned. The returned failure will contain references to the original failure that caused the transaction to be rolled back and to the Connection in which that happened, so the user can take a decision whether she still wants to be using it or just close it, because an open transaction might have been left open in the database.

It is safe to call this method multiple times without waiting for the first query to complete.

Parameters:interaction (any callable) – A callable whose first argument is a Cursor.
Returns:A Deferred that will fire with the return value of interaction.
cancel(d)[source]

Cancel the current operation. The cancellation does not happen immediately, because the PostgreSQL protocol requires that the application waits for confirmation after the query has been cancelled. Be careful when cancelling an interaction, because if the interaction includes sending multiple queries to the database server, you can’t really be sure which one are you cancelling.

Parameters:d – a Deferred returned by one of Connection methods.
cursorRunning(cursor)[source]

Called automatically when a Cursor created by this Connection starts polling after executing a query. User code should never have to call this method.

cursorFinished(cursor)[source]

Called automatically when a Cursor created by this Connection is done with polling after executing a query. User code should never have to call this method.

checkForNotifies()[source]

Check if NOTIFY events have been received and if so, dispatch them to the registered observers, using the Cooperator provided in the constructor. This is done automatically, user code should never need to call this method.

addNotifyObserver(observer)[source]

Add an observer function that will get called whenever a NOTIFY event is delivered to this connection. Any number of observers can be added to a connection. Adding an observer that’s already been added is ignored.

Observer functions are processed using the Cooperator provided in the constructor to avoid blocking the reactor thread when processing large numbers of events. If an observer returns a Deferred, processing waits until it fires or errbacks.

There are no guarantees as to the order in which observer functions are called when NOTIFY events are delivered. Exceptions in observers are logged and discarded.

Parameters:observer (any callable) – A callable whose first argument is a psycopg2.extensions.Notify.
removeNotifyObserver(observer)[source]

Remove a previously added observer function. Removing an observer that’s never been added will be ignored.

Parameters:observer (any callable) – A callable that should no longer be called on NOTIFY events.
getNotifyObservers()[source]

Get the currently registered notify observers.

Returns:A set of callables that will get called on NOTIFY events.
Return type:set
class txpostgres.txpostgres.Cursor(cursor, connection)[source]

Bases: txpostgres.txpostgres._PollingMixin

A wrapper for a psycopg2 asynchronous cursor.

The wrapper will forward almost everything to the wrapped cursor, so the usual DB-API interface can be used, but it will return Deferred objects that will fire with the DB-API results.

Remember that the PostgreSQL protocol does not support concurrent asynchronous queries execution, so you need to take care not to execute a query while another is still being processed.

In most cases you should just use the Connection methods that will handle the locking necessary to prevent concurrent query execution.

execute(query, params=None)[source]

A regular DB-API execute, but returns a Deferred.

The caller must be careful not to call this method twice on cursors from the same connection without waiting for the previous execution to complete.

Returns:A Deferred that will fire with the results of the DB-API execute.
callproc(procname, params=None)[source]

A regular DB-API callproc, but returns a Deferred.

The caller must be careful not to call this method twice on cursors from the same connection without waiting for the previous execution to complete.

Returns:A Deferred that will fire with the results of the DB-API callproc.
close()[source]

Close the cursor.

Once closed, the cursor cannot be used again.

Returns:None
class txpostgres.txpostgres.ConnectionPool(_ignored, *connargs, **connkw)[source]

Bases: object

A poor man’s pool of Connection instances.

Variables:
  • min (int) – The amount of connections that will be open when start() is called. The pool never opens or closes connections on its own after starting. Defaults to 3.
  • connectionFactory (any callable) – The factory used to produce connections, defaults to Connection.
  • reactor – The reactor passed to connectionFactory.
  • cooperator – The cooperator passed to connectionFactory.
__init__(_ignored, *connargs, **connkw)[source]

Create a new connection pool.

Any positional or keyword arguments other than the first one and a min keyword argument are passed to connectionFactory when connecting. Use these arguments to pass database names, usernames, passwords, etc.

Parameters:_ignored (any object) – Ignored, for adbapi.ConnectionPool compatibility.
start()[source]

Start the connection pool.

This will create as many connections as the pool’s min variable says.

Returns:A Deferred that fires when all connection have succeeded.
close()[source]

Stop the pool.

Disconnects all connections.

Returns:None
remove(connection)[source]

Remove a connection from the pool.

Provided to be able to remove broken connections from the pool. The caller should make sure the removed connection does not have queries pending.

Parameters:connection (an object produced by the pool’s connectionFactory) – The connection to be removed.
add(connection)[source]

Add a connection to the pool.

Provided to be able to extend the pool with new connections.

Parameters:connection (an object compatible with those produced by the pool’s connectionFactory) – The connection to be added.
runQuery(*args, **kwargs)[source]

Execute an SQL query using a pooled connection and return the result.

One of the pooled connections will be chosen, its runQuery() method will be called and the resulting Deferred will be returned.

Returns:A Deferred obtained by a pooled connection’s runQuery()
runOperation(*args, **kwargs)[source]

Execute an SQL query using a pooled connection and discard the result.

One of the pooled connections will be chosen, its runOperation() method will be called and the resulting Deferred will be returned.

Returns:A Deferred obtained by a pooled connection’s runOperation()
runInteraction(interaction, *args, **kwargs)[source]

Run commands in a transaction using a pooled connection and return the result.

One of the pooled connections will be chosen, its runInteraction() method will be called and the resulting Deferred will be returned.

Parameters:interaction (any callable) – A callable that will be passed to Connection.runInteraction
Returns:A Deferred obtained by a pooled connection’s Connection.runInteraction
class txpostgres.txpostgres._PollingMixin[source]

Bases: object

An object that wraps something pollable. It can take care of waiting for the wrapped pollable to reach the OK state and adapts the pollable’s interface to interfaces.IReadWriteDescriptor. It will forward all attribute access that is has not been wrapped to the underlying pollable. Useful as a mixin for classes that wrap a psycopg2 pollable object.

Variables:
  • reactor (an IReactorFDSet provider) – The reactor that the class will use to wait for the wrapped pollable to reach the OK state.
  • prefix (str) – Prefix used during log formatting to indicate context.
pollable()[source]

Return the pollable object. Subclasses should override this.

Returns:A psycopg2 pollable.
poll()[source]

Start polling the wrapped pollable.

Returns:A Deferred that will fire with an instance of this class when the pollable reaches the OK state.
continuePolling(swallowErrors=False)[source]

Move forward in the poll cycle. This will call psycopg2’s poll() on the wrapped pollable and either wait for more I/O or callback or errback the Deferred returned earlier if the polling cycle has been completed.

Parameters:swallowErrors (bool) – Should errors with no one to report them to be ignored.
Raise:UnexpectedPollResult when poll() returns a result from outside of the expected list.
exception txpostgres.txpostgres.AlreadyConnected[source]

Bases: exceptions.Exception

The database connection is already open.

exception txpostgres.txpostgres.RollbackFailed(connection, originalFailure)[source]

Bases: exceptions.Exception

Rolling back the transaction failed, the connection might be in an unusable state.

Variables:
  • connection (Connection) – The connection that failed to roll back its transaction.
  • originalFailure (a Twisted Failure) – The failure that caused the connection to try to roll back the transaction.
exception txpostgres.txpostgres.UnexpectedPollResult[source]

Bases: exceptions.Exception

Polling returned an unexpected result.

exception txpostgres.txpostgres.AlreadyPolling[source]

Bases: exceptions.Exception

The previous poll cycle has not been finished yet.

This probably indicates an issue in txpostgres, rather than in user code.

txpostgres.reconnection

class txpostgres.reconnection.DeadConnectionDetector(deathChecker=None, reconnectionIterator=None, reactor=None)[source]

Bases: object

A class implementing reconnection strategy. When the connection is discovered to be dead, it will start the reconnection process.

The object being reconnected should proxy all operations through the detector’s callChecking() which will automatically fail them if the connection is currently dead. This is done to prevent sending requests to a resource that’s not currently available.

When an instance of Connection is passed a DeadConnectionDetector it automatically starts using it to provide reconnection.

Another way of using this class is manually calling checkForDeadConnection() passing it a Failure instance to trigger reconnection. This is useful to handle initial connection errors, for example:

conn = txpostgres.Connection(detector=DeadConnectionDetector())
d = conn.connect('dbname=test')
d.addErrback(conn.detector.checkForDeadConnection)
Variables:
  • reconnectable (object) – An object to be reconnected. It should provide a connect and a close method.
  • connectionIsDead (bool) – If the connection is currently believed to be dead.
setReconnectable(reconnectable, *connargs, **connkw)[source]

Register a reconnectable with the detector. Needs to be called before the detector will be used. The remaining arguments will be passed to the reconnectable’s connect method on each reconnection.

Parameters:reconnectable (object) – An object to be reconnected. It should provide a connect and a close method.
callChecking(method, *args, **kwargs)[source]

Call a method if the connection is still alive.

checkForDeadConnection(f)[source]

Get passed a Failure instance and determine if it means that the connection is dead. If so, start reconnecting.

startReconnecting(f)[source]

Called when the connection is detected to be dead.

reconnect()[source]

Called on each attempt of reconnection.

connectionRecovered()[source]

Called when the connection has recovered.

addRecoveryHandler(handler)[source]

Add a handler function that will get called whenever the connection is recovered. Any number of handlers can be added. Adding a handler that’s already been added is ignored.

Recovery handlers are ran in parallel. If any of them return a Deferred, recovery will wait until it fires.

There are no guarantees as to the order in which handler functions are called. Exceptions in handlers are logged and discarded.

Parameters:handler – A zero-argument callable.
removeRecoveryHandler(handler)[source]

Remove a previously added recovery handler. Removing a handler that’s never been added will be ignored.

Parameters:handler – A callable that should no longer be called when the connection recovers.
getRecoveryHandlers()[source]

Get the currently registered recovery handlers.

Returns:A set of callables that will get called on recovery.
Return type:set
txpostgres.reconnection.defaultDeathChecker(f)[source]

Checker function suitable for use with DeadConnectionDetector.

txpostgres.reconnection.defaultReconnectionIterator()[source]

A function returning sane defaults for a reconnection iterator, for use with DeadConnectionDetector.

The defaults have maximum reconnection delay capped at 10 seconds and no limit on the number of retries.

exception txpostgres.reconnection.ConnectionDead[source]

Bases: exceptions.Exception

The connection is dead.

txpostgres.retrying

class txpostgres.retrying.RetryingCall(f, *args, **kw)[source]

Bases: object

Calls a function repeatedly, passing it args and keyword args. Failures are passed to a user-supplied failure testing function. If the failure is ignored, the function is called again after a delay whose duration is obtained from a user-supplied iterator. The start method (below) returns a Deferred that fires with the eventual non-error result of calling the supplied function, or fires its errback if no successful result can be obtained before the delay backoff iterator raises StopIteration.

It is important to note the behaviour when the delay of any of the steps is zero. The function is the called synchronously, ie. control does not go back to the reactor between obtaining the delay from the iterator and calling the function if the iterator returns zero.

The resetBackoff() method replaces the backoff iterator with another one and is useful to reset the delay if some phase of the process has succeeded and that makes the desirable initial delay different again.

start(backoffIterator=None, failureTester=None)[source]

Start the call and retry it until it succeeds and fails.

Parameters:
  • backoffIterator (callable) – A zero-argument callable that should return a iterator yielding reconnection delay periods. If None then simpleBackoffIterator() will be used.
  • failureTester (callable) – A one-argument callable that will be called with a Failure instance each time the function being retried fails. It should return None if the call should be retried or a Failure if the retrying process should be stopped. If None is used for this parameter, retrying will never stop until the backoff iterator is exhausted.
resetBackoff(backoffIterator=None)[source]

Replace the current backoff iterator with a new one.

txpostgres.retrying.simpleBackoffIterator(initialDelay=1.0, maxDelay=3600, factor=2.718281828459045, jitter=0.11962656472, maxRetries=10, now=True)[source]

Yields increasing timeout values between retries of a call. The default factor and jitter are taken from Twisted’s ReconnectingClientFactory.

Variables:
  • initialDelay (float) – Initial delay, in seconds.
  • maxDelay (float) – Maximum cap for the delay, if zero then no maximum is applied.
  • factor (float) – Multiplicative factor for increasing the delay.
  • jitter (float) – Randomness factor to include when increasing the delay, to prevent stampeding.
  • maxRetries (int) – If non-zero, only yield so many values after exhausting the iterator.
  • now (bool) – If the very first delay yielded should always be zero.