Module usage

Basic usage of the module is not very different from using Twisted’s adbapi:

from txpostgres import txpostgres

from twisted.internet import reactor
from twisted.python import log, util

# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')

# run the query and print the result
d.addCallback(lambda _: conn.runQuery('select tablename from pg_tables'))
d.addCallback(lambda result: util.println('All tables:', result))

# close the connection, log any errors and stop the reactor
d.addCallback(lambda _: conn.close())
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())

# start the reactor to kick off connection estabilishing
reactor.run()

If you want you can use the Cursor class directly, with a interface closer to Psycopg. Note that using this method you have to make sure never to execute a query before the previous one finishes, as that would violate the PostgreSQL asynchronous protocol.

from txpostgres import txpostgres

from twisted.internet import reactor
from twisted.python import log, util

# define the libpq connection string and the query to use
connstr = 'dbname=postgres'
query = 'select tablename from pg_tables order by tablename'

# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')


def useCursor(cur):
    # execute a query
    d = cur.execute(query)
    # fetch the first row from the result
    d.addCallback(lambda _: cur.fetchone())
    # output it
    d.addCallback(lambda result: util.println('First table name:', result[0]))
    # and close the cursor
    return d.addCallback(lambda _: cur.close())

# create a cursor and use it
d.addCallback(lambda _: conn.cursor())
d.addCallback(useCursor)

# log any errors and stop the reactor
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())

# start the reactor to kick off connection estabilishing
reactor.run()

Using transactions

Every query executed by txpostgres is committed immediately. If you need to execute a series of queries in a transaction, use the runInteraction() method:

from txpostgres import txpostgres

from twisted.internet import reactor
from twisted.python import log

# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')


def interaction(cur):
    """
    A callable that will execute inside a transaction.
    """
    # the parameter is a txpostgres Cursor
    d = cur.execute('create table test(x integer)')
    d.addCallback(lambda _: cur.execute('insert into test values (%s)', (1, )))
    return d

# run the interaction, making sure that if the insert fails, the table won't be
# left behind created but empty
d.addCallback(lambda _: conn.runInteraction(interaction))

# close the connection, log any errors and stop the reactor
d.addCallback(lambda _: conn.close())
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())

# start the reactor to kick off connection estabilishing
reactor.run()

Customising the connection and cursor factories

You might want to customise the way txpostgres creates connections and cursors to take advantage of Psycopg features like dictionary cursors. To do that, define a subclass of Connection and override connectionFactory or cursorFactory class attributes to use your custom code. Here’s an example of how to use dict cursors:

import psycopg2
import psycopg2.extras
from txpostgres import txpostgres

from twisted.internet import reactor
from twisted.python import log, util


def dict_connect(*args, **kwargs):
    kwargs['connection_factory'] = psycopg2.extras.DictConnection
    return psycopg2.connect(*args, **kwargs)


class DictConnection(txpostgres.Connection):
    connectionFactory = staticmethod(dict_connect)


# connect using the custom connection class
conn = DictConnection()
d = conn.connect('dbname=postgres')

# run a query and print the result
d.addCallback(lambda _: conn.runQuery('select * from pg_tablespace'))
# access the column by its name
d.addCallback(lambda result: util.println('All tablespace names:',
                                          [row['spcname'] for row in result]))

# close the connection, log any errors and stop the reactor
d.addCallback(lambda _: conn.close())
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())

# start the reactor to kick off connection estabilishing
reactor.run()

Listening for database notifications

Being an asynchronous driver, txpostgres supports the PostgreSQL NOTIFY feature for sending asynchronous notifications to connections. Here is an example script that connects to the database and listens for notifications on the list channel. Every time a notification is received, it interprets the payload as part of the name of a table and outputs a list of tables with names containing that payload.

from txpostgres import txpostgres

from twisted.internet import reactor
from twisted.python import util


def outputResults(results, payload):
    print "Tables with `%s' in their name:" % payload
    for result in results:
        print result[0]


def observer(notify):
    if not notify.payload:
        print "No payload"
        return

    query = ("select tablename from pg_tables "
             "where tablename like '%%' || %s || '%%'")
    d = conn.runQuery(query, (notify.payload, ))
    d.addCallback(outputResults, notify.payload)


# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')

# add a NOTIFY observer
conn.addNotifyObserver(observer)
# start listening for NOTIFY events on the 'list' channel
d.addCallback(lambda _: conn.runOperation("listen list"))
d.addCallback(lambda _: util.println("Listening on the `list' channel"))

# process events until killed
reactor.run()

To try it execute the example code and then open another session using psql and try sending some NOTIFY events:

$ psql postgres
psql (9.1.2)
Type "help" for help.

postgres=> notify list, 'user';
NOTIFY
postgres=> notify list, 'auth';
NOTIFY

You should see the example program outputting lists of table names containing the payload:

$ python notify_example.py
Listening on the `list' channel
Tables with `user' in their name:
pg_user_mapping
Tables with `auth' in their name:
pg_authid
pg_auth_members

Automatic reconnection

The module includes provision for automatically reconnecting to the database in case the connection gets broken. To use it, pass a DeadConnectionDetector instance to Connection. You can customise the detector instance or subclass it to add custom logic. See the documentation for DeadConnectionDetector for details.

When a Connection is configured with a detector, it will automatically start the reconnection process whenever it encounters a certain class of errors indicative of a disconnect. See defaultDeathChecker() for more.

While the connection is down, all attempts to use it will result in immediate failures with ConnectionDead. This is to prevent sending additional queries down a link that’s known to be down.

Here’s an example of using automatic reconnection in txpostgres:

from txpostgres import txpostgres, reconnection

from twisted.internet import reactor, task


class LoggingDetector(reconnection.DeadConnectionDetector):

    def startReconnecting(self, f):
        print '[*] database connection is down (error: %r)' % f.value
        return reconnection.DeadConnectionDetector.startReconnecting(self, f)

    def reconnect(self):
        print '[*] reconnecting...'
        return reconnection.DeadConnectionDetector.reconnect(self)

    def connectionRecovered(self):
        print '[*] connection recovered'
        return reconnection.DeadConnectionDetector.connectionRecovered(self)


def result(res):
    print '-> query returned result: %s' % res


def error(f):
    print '-> query failed with %r' % f.value


def connectionError(f):
    print '-> connecting failed with %r' % f.value


def runLoopingQuery(conn):
    d = conn.runQuery('select 1')
    d.addCallbacks(result, error)


def connected(_, conn):
    print '-> connected, running a query periodically'
    lc = task.LoopingCall(runLoopingQuery, conn)
    return lc.start(2)


# connect to the database using reconnection
conn = txpostgres.Connection(detector=LoggingDetector())
d = conn.connect('dbname=postgres')

# if the connection failed, log the error and start reconnecting
d.addErrback(conn.detector.checkForDeadConnection)
d.addErrback(connectionError)
d.addCallback(connected, conn)

# process events until killed
reactor.run()

You can run this snippet and then try restarting the database. Logging lines should appear, as the connection gets automatically recovered.

Choosing a Psycopg implementation

To use txpostgres, you will need a recent enough version of Psycopg, namely 2.2.0 or later. Since parts of Psycopg are written in C, it is not available on some Python implementations, like PyPy. When first imported, txpostgres will try to detect if an API-compatible implementation of Psycopg is available.

You can force a certain implementation to be used by exporing an environment variable TXPOSTGRES_PSYCOPG_IMPL. Recognized values are:

psycopg2
Force using Psycopg, do not try any fallbacks.
psycopg2cffi
Use psycopg2cffi, a psycopg2 implementation based on cffi, known to work on PyPy.
psycopg2ct
Use psycopg2ct, an older psycopg2 implementation using ctypes, also compatible with PyPy.