testgres package

testgres.api

Testing framework for PostgreSQL and its extensions

This module was created under influence of Postgres TAP test feature (PostgresNode.pm module). It can manage Postgres clusters: initialize, edit configuration files, start/stop cluster, execute queries. The typical flow may look like:

>>> with get_new_node() as node:
...     node.init().start()
...     result = node.safe_psql('postgres', 'select 1')
...     print(result.decode('utf-8').strip())
...     node.stop()
PostgresNode(name='...', port=..., base_dir='...')
1
PostgresNode(name='...', port=..., base_dir='...')
Or:
>>> with get_new_node() as master:
...     master.init().start()
...     with master.backup() as backup:
...         with backup.spawn_replica() as replica:
...             replica = replica.start()
...             master.execute('postgres', 'create table test (val int4)')
...             master.execute('postgres', 'insert into test values (0), (1), (2)')
...             replica.catchup()  # wait until changes are visible
...             print(replica.execute('postgres', 'select count(*) from test'))
PostgresNode(name='...', port=..., base_dir='...')
[(3,)]
testgres.api.get_new_node(name=None, base_dir=None, **kwargs)

Simply a wrapper around PostgresNode constructor. See PostgresNode.__init__() for details.

testgres.backup

class testgres.backup.NodeBackup(node, base_dir=None, username=None, xlog_method=<XLogMethod.fetch: 'fetch'>)

Bases: object

Smart object responsible for backups

cleanup()

Remove all files that belong to this backup. No-op if it’s been converted to a PostgresNode (destroy=True).

log_file
spawn_primary(name=None, destroy=True)

Create a primary node from a backup.

Parameters:
  • name – primary’s application name.
  • destroy – should we convert this backup into a node?
Returns:

New instance of PostgresNode.

spawn_replica(name=None, destroy=True, slot=None)

Create a replica of the original node from a backup.

Parameters:
  • name – replica’s application name.
  • slot – create a replication slot with the specified name.
  • destroy – should we convert this backup into a node?
Returns:

New instance of PostgresNode.

testgres.config

class testgres.config.GlobalConfig(**options)

Bases: object

Global configuration object which allows user to override default settings.

copy()

Return a copy of this object.

items()

Return setting-value pairs.

keys()

Return a list of all available settings.

update(config)

Extract setting-value pairs from ‘config’ and assign those values to corresponding settings of this GlobalConfig object.

cache_initdb = True

shall we use cached initdb instance?

cache_pg_config = True

shall we cache pg_config results?

cached_initdb_dir

path to a temp directory for cached initdb.

cached_initdb_unique = False

shall we give new node a unique system id?

error_log_lines = 20

N of log lines to be shown in exceptions (0=inf).

node_cleanup_full = True

shall we remove EVERYTHING (including logs)?

node_cleanup_on_bad_exit = False

remove base_dir on __exit__() via exception.

node_cleanup_on_good_exit = True

remove base_dir on nominal __exit__().

temp_dir

path to temp dir containing nodes with default ‘base_dir’.

use_python_logging = False

enable python logging subsystem (see logger.py).

testgres.config.configure_testgres(**options)

Adjust current global options. Look at the GlobalConfig to learn about existing settings.

testgres.config.pop_config()

Set previous GlobalConfig options from stack.

testgres.config.push_config(**options)

Permanently set custom GlobalConfig options and put previous settings on top of the config stack.

testgres.config.scoped_config(**options)

Temporarily set custom GlobalConfig options for this context. Previous options are pushed to the config stack.

Example

>>> from .api import get_new_node
>>> with scoped_config(cache_initdb=False):
...     # create a new node with fresh initdb
...     with get_new_node().init().start() as node:
...         print(node.execute('select 1'))
[(1,)]

testgres.connection

class testgres.connection.NodeConnection(node, dbname=None, username=None, password=None, autocommit=False)

Bases: object

Transaction wrapper returned by Node

begin(isolation_level=<IsolationLevel.ReadCommitted: 'read committed'>)
close()
commit()
connection
cursor
execute(query, *args)
node
pid
rollback()

testgres.enums

class testgres.enums.DumpFormat

Bases: enum.Enum

Available dump formats

Custom = 'custom'
Directory = 'directory'
Plain = 'plain'
Tar = 'tar'
class testgres.enums.IsolationLevel

Bases: enum.Enum

Transaction isolation level for NodeConnection

ReadCommitted = 'read committed'
ReadUncommitted = 'read uncommitted'
RepeatableRead = 'repeatable read'
Serializable = 'serializable'
class testgres.enums.NodeStatus

Bases: enum.IntEnum

Status of a PostgresNode

Running = 0
Stopped = 1
Uninitialized = 2
class testgres.enums.ProcessType

Bases: enum.Enum

Types of processes

AutovacuumLauncher = 'autovacuum launcher'
BackgroundWriter = 'background writer'
Checkpointer = 'checkpointer'
LogicalReplicationLauncher = 'logical replication launcher'
Startup = 'startup'
StatsCollector = 'stats collector'
Unknown = 'unknown'
WalReceiver = 'wal receiver'
WalSender = 'wal sender'
WalWriter = 'wal writer'
class testgres.enums.XLogMethod

Bases: enum.Enum

Available WAL methods for NodeBackup

fetch = 'fetch'
none = 'none'
stream = 'stream'

testgres.exceptions

exception testgres.exceptions.BackupException

Bases: testgres.exceptions.TestgresException

exception testgres.exceptions.CatchUpException(message=None, query=None)

Bases: testgres.exceptions.QueryException

exception testgres.exceptions.ExecUtilException(message=None, command=None, exit_code=0, out=None)

Bases: testgres.exceptions.TestgresException

exception testgres.exceptions.InitNodeException

Bases: testgres.exceptions.TestgresException

exception testgres.exceptions.QueryException(message=None, query=None)

Bases: testgres.exceptions.TestgresException

exception testgres.exceptions.StartNodeException(message=None, files=None)

Bases: testgres.exceptions.TestgresException

exception testgres.exceptions.TestgresException

Bases: Exception

exception testgres.exceptions.TimeoutException(message=None, query=None)

Bases: testgres.exceptions.QueryException

testgres.node

class testgres.node.PostgresNode(name=None, port=None, base_dir=None)
__init__(name=None, port=None, base_dir=None)

PostgresNode constructor.

Parameters:
  • name – node’s application name.
  • port – port to accept connections.
  • base_dir – path to node’s data directory.
append_conf(line='', filename='postgresql.conf', **kwargs)

Append line to a config file.

Parameters:
  • line – string to be appended to config.
  • filename – config file (postgresql.conf by default).
  • **kwargs – named config options.
Returns:

This instance of PostgresNode.

Examples

>>> append_conf(fsync=False)
>>> append_conf('log_connections = yes')
>>> append_conf(random_page_cost=1.5, fsync=True, ...)
>>> append_conf('postgresql.conf', 'synchronous_commit = off')
auxiliary_pids

Returns a dict of { ProcessType – PID }.

auxiliary_processes

Returns a list of auxiliary processes. Each process is represented by ProcessProxy object.

backup(**kwargs)

Perform pg_basebackup.

Parameters:
  • username – database user name.
  • xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
  • base_dir – the base directory for data files and logs
Returns:

A smart object of type NodeBackup.

catchup(dbname=None, username=None)

Wait until async replica catches up with its master.

child_processes

Returns a list of all child processes. Each process is represented by ProcessProxy object.

cleanup(max_attempts=3)

Stop node if needed and remove its data/logs directory. NOTE: take a look at TestgresConfig.node_cleanup_full.

Parameters:max_attempts – how many times should we try to stop()?
Returns:This instance of PostgresNode.
connect(dbname=None, username=None, password=None, autocommit=False)

Connect to a database.

Parameters:
  • dbname – database name to connect to.
  • username – database user name.
  • password – user’s password.
  • autocommit – commit each statement automatically. Also it should be set to True for statements requiring to be run outside a transaction? such as VACUUM or CREATE DATABASE.
Returns:

An instance of NodeConnection.

default_conf(fsync=False, unix_sockets=True, allow_streaming=True, allow_logical=False, log_statement='all')

Apply default settings to this node.

Parameters:
  • fsync – should this node use fsync to keep data safe?
  • unix_sockets – should we enable UNIX sockets?
  • allow_streaming – should this node add a hba entry for replication?
  • allow_logical – can this node be used as a logical replication publisher?
  • log_statement – one of (‘all’, ‘off’, ‘mod’, ‘ddl’).
Returns:

This instance of PostgresNode.

dump(filename=None, dbname=None, username=None, format=<DumpFormat.Plain: 'plain'>)

Dump database into a file using pg_dump. NOTE: the file is not removed automatically.

Parameters:
  • filename – database dump taken by pg_dump.
  • dbname – database name to connect to.
  • username – database user name.
  • format – format argument plain/custom/directory/tar.
Returns:

Path to a file containing dump.

execute(query, dbname=None, username=None, password=None, commit=True)

Execute a query and return all rows as list.

Parameters:
  • query – query to be executed.
  • dbname – database name to connect to.
  • username – database user name.
  • password – user’s password.
  • commit – should we commit this query?
Returns:

A list of tuples representing rows.

free_port()

Reclaim port owned by this node. NOTE: does not free auto selected ports.

get_control_data()

Return contents of pg_control file.

init(initdb_params=None, **kwargs)

Perform initdb for this node.

Parameters:
  • initdb_params – parameters for initdb (list).
  • fsync – should this node use fsync to keep data safe?
  • unix_sockets – should we enable UNIX sockets?
  • allow_streaming – should this node add a hba entry for replication?
Returns:

This instance of PostgresNode

pg_ctl(params)

Invoke pg_ctl with params.

Parameters:params – arguments for pg_ctl.
Returns:Stdout + stderr of pg_ctl.
pgbench(dbname=None, username=None, stdout=None, stderr=None, options=[])

Spawn a pgbench process.

Parameters:
  • dbname – database name to connect to.
  • username – database user name.
  • stdout – stdout file to be used by Popen.
  • stderr – stderr file to be used by Popen.
  • options – additional options for pgbench (list).
Returns:

Process created by subprocess.Popen.

pgbench_init(**kwargs)

Small wrapper for pgbench_run(). Sets initialize=True.

Returns:This instance of PostgresNode.
pgbench_run(dbname=None, username=None, options=[], **kwargs)

Run pgbench with some options. This event is logged (see self.utils_log_file).

Parameters:
  • dbname – database name to connect to.
  • username – database user name.
  • options – additional options for pgbench (list).
  • **kwargs – named options for pgbench. Run pgbench –help to learn more.
Returns:

Stdout produced by pgbench.

Examples

>>> pgbench_run(initialize=True, scale=2)
>>> pgbench_run(time=10)
pid

Return postmaster’s PID if node is running, else 0.

poll_query_until(query, dbname=None, username=None, max_attempts=0, sleep_time=1, expected=True, commit=True, suppress=None)

Run a query once per second until it returns ‘expected’. Query should return a single value (1 row, 1 column).

Parameters:
  • query – query to be executed.
  • dbname – database name to connect to.
  • username – database user name.
  • max_attempts – how many times should we try? 0 == infinite
  • sleep_time – how much should we sleep after a failure?
  • expected – what should be returned to break the cycle?
  • commit – should (possible) changes be committed?
  • suppress – a collection of exceptions to be suppressed.

Examples

>>> poll_query_until('select true')
>>> poll_query_until('postgres', "select now() > '01.01.2018'")
>>> poll_query_until('select false', expected=True, max_attempts=4)
>>> poll_query_until('select 1', suppress={testgres.OperationalError})
promote(dbname=None, username=None)

Promote standby instance to master using pg_ctl. For PostgreSQL versions below 10 some additional actions required to ensure that instance became writable and hence dbname and username parameters may be needed.

Returns:This instance of PostgresNode.
psql(query=None, filename=None, dbname=None, username=None, input=None, **variables)

Execute a query using psql.

Parameters:
  • query – query to be executed.
  • filename – file with a query.
  • dbname – database name to connect to.
  • username – database user name.
  • input – raw input to be passed.
  • **variables – vars to be set before execution.
Returns:

A tuple of (code, stdout, stderr).

Examples

>>> psql('select 1')
>>> psql('postgres', 'select 2')
>>> psql(query='select 3', ON_ERROR_STOP=1)
publish(name, **kwargs)

Create publication for logical replication

Parameters:
  • pubname – publication name
  • tables – tables names list
  • dbname – database name where objects or interest are located
  • username – replication username
reload(params=[])

Reload config files using pg_ctl.

Parameters:params – additional arguments for pg_ctl.
Returns:This instance of PostgresNode.
replicate(name=None, slot=None, **kwargs)

Create a binary replica of this node.

Parameters:
  • name – replica’s application name.
  • slot – create a replication slot with the specified name.
  • username – database user name.
  • xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
  • base_dir – the base directory for data files and logs
restart(params=[])

Restart this node using pg_ctl.

Parameters:params – additional arguments for pg_ctl.
Returns:This instance of PostgresNode.
restore(filename, dbname=None, username=None)

Restore database from pg_dump’s file.

Parameters:
  • filename – database dump taken by pg_dump in custom/directory/tar formats.
  • dbname – database name to connect to.
  • username – database user name.
safe_psql(query=None, **kwargs)

Execute a query using psql.

Parameters:
  • query – query to be executed.
  • filename – file with a query.
  • dbname – database name to connect to.
  • username – database user name.
  • input – raw input to be passed.
  • are passed to psql() (**kwargs) –
Returns:

psql’s output as str.

source_walsender

Returns master’s walsender feeding this replica.

start(params=[], wait=True)

Start this node using pg_ctl.

Parameters:
  • params – additional arguments for pg_ctl.
  • wait – wait until operation completes.
Returns:

This instance of PostgresNode.

status()

Check this node’s status.

Returns:An instance of NodeStatus.
stop(params=[], wait=True)

Stop this node using pg_ctl.

Parameters:
  • params – additional arguments for pg_ctl.
  • wait – wait until operation completes.
Returns:

This instance of PostgresNode.

subscribe(publication, name, dbname=None, username=None, **params)

Create subscription for logical replication

Parameters:
  • name – subscription name
  • publication – publication object obtained from publish()
  • dbname – database name
  • username – replication username
  • params – subscription parameters (see documentation on CREATE SUBSCRIPTION for details)
version

Return PostgreSQL version for this node.

Returns:Instance of distutils.version.LooseVersion.
class testgres.node.ProcessProxy(process, ptype=None)

Wrapper for psutil.Process

process

wrapped psutill.Process object

ptype

instance of ProcessType

testgres.pubsub

Unlike physical replication the logical replication allows users replicate only specified databases and tables. It uses publish-subscribe model with possibly multiple publishers and multiple subscribers. When initializing publisher’s node allow_logical=True should be passed to the PostgresNode.init() method to enable PostgreSQL to write extra information to the WAL needed by logical replication.

To replicate table X from node A to node B the same table structure should be defined on the subscriber’s node as logical replication don’t replicate DDL. After that publish() and subscribe() methods may be used to setup replication. Example:

>>> from testgres import get_new_node
>>> with get_new_node() as nodeA, get_new_node() as nodeB:
...     nodeA.init(allow_logical=True).start()
...     nodeB.init().start()
...
...     # create same table both on publisher and subscriber
...     create_table = 'create table test (a int, b int)'
...     nodeA.safe_psql(create_table)
...     nodeB.safe_psql(create_table)
...
...     # create publication
...     pub = nodeA.publish('mypub')
...     # create subscription
...     sub = nodeB.subscribe(pub, 'mysub')
...
...     # insert some data to the publisher's node
...     nodeA.execute('insert into test values (1, 1), (2, 2)')
...
...     # wait until changes apply on subscriber and check them
...     sub.catchup()
...
...     # read the data from subscriber's node
...     nodeB.execute('select * from test')
PostgresNode(name='...', port=..., base_dir='...')
PostgresNode(name='...', port=..., base_dir='...')
''
''
[(1, 1), (2, 2)]
class testgres.node.Publication(name, node, tables=None, dbname=None, username=None)
__init__(name, node, tables=None, dbname=None, username=None)

Constructor. Use PostgresNode.publish() instead of direct constructing publication objects.

Parameters:
  • name – publication name.
  • node – publisher’s node.
  • tables – tables list or None for all tables.
  • dbname – database name used to connect and perform subscription.
  • username – username used to connect to the database.
add_tables(tables, dbname=None, username=None)

Add tables to the publication. Cannot be used if publication was created with empty tables list.

Parameters:tables – a list of tables to be added to the publication.
drop(dbname=None, username=None)

Drop publication

class testgres.node.Subscription(node, publication, name=None, dbname=None, username=None, **params)
__init__(node, publication, name=None, dbname=None, username=None, **params)

Constructor. Use PostgresNode.subscribe() instead of direct constructing subscription objects.

Parameters:
  • name – subscription name.
  • node – subscriber’s node.
  • publicationPublication object we are subscribing to (see PostgresNode.publish()).
  • dbname – database name used to connect and perform subscription.
  • username – username used to connect to the database.
  • params

    subscription parameters (see documentation on CREATE SUBSCRIPTION for details).

catchup(username=None)

Wait until subscription catches up with publication.

Parameters:username – remote node’s user name.
disable(dbname=None, username=None)

Disables the running subscription.

drop(dbname=None, username=None)

Drops subscription

enable(dbname=None, username=None)

Enables the previously disabled subscription.

refresh(copy_data=True, dbname=None, username=None)

Disables the running subscription.