testgres package¶
testgres.api¶
Testing framework for PostgreSQL and its extensions
This module was created under influence of Postgres TAP test feature (PostgresNode.pm module). It can manage Postgres clusters: initialize, edit configuration files, start/stop cluster, execute queries. The typical flow may look like:
>>> with get_new_node() as node:
... node.init().start()
... result = node.safe_psql('postgres', 'select 1')
... print(result.decode('utf-8').strip())
... node.stop()
PostgresNode(name='...', port=..., base_dir='...')
1
PostgresNode(name='...', port=..., base_dir='...')
Or:
>>> with get_new_node() as master:
... master.init().start()
... with master.backup() as backup:
... with backup.spawn_replica() as replica:
... replica = replica.start()
... master.execute('postgres', 'create table test (val int4)')
... master.execute('postgres', 'insert into test values (0), (1), (2)')
... replica.catchup() # wait until changes are visible
... print(replica.execute('postgres', 'select count(*) from test'))
PostgresNode(name='...', port=..., base_dir='...')
[(3,)]
-
testgres.api.
get_new_node
(name=None, base_dir=None, **kwargs)¶ Simply a wrapper around
PostgresNode
constructor. SeePostgresNode.__init__()
for details.
testgres.backup¶
-
class
testgres.backup.
NodeBackup
(node, base_dir=None, username=None, xlog_method=<XLogMethod.fetch: 'fetch'>)¶ Bases:
object
Smart object responsible for backups
-
cleanup
()¶ Remove all files that belong to this backup. No-op if it’s been converted to a PostgresNode (destroy=True).
-
log_file
¶
-
spawn_primary
(name=None, destroy=True)¶ Create a primary node from a backup.
Parameters: - name – primary’s application name.
- destroy – should we convert this backup into a node?
Returns: New instance of
PostgresNode
.
-
spawn_replica
(name=None, destroy=True, slot=None)¶ Create a replica of the original node from a backup.
Parameters: - name – replica’s application name.
- slot – create a replication slot with the specified name.
- destroy – should we convert this backup into a node?
Returns: New instance of
PostgresNode
.
-
testgres.config¶
-
class
testgres.config.
GlobalConfig
(**options)¶ Bases:
object
Global configuration object which allows user to override default settings.
-
copy
()¶ Return a copy of this object.
-
items
()¶ Return setting-value pairs.
-
keys
()¶ Return a list of all available settings.
-
update
(config)¶ Extract setting-value pairs from ‘config’ and assign those values to corresponding settings of this GlobalConfig object.
-
cache_initdb
= True¶ shall we use cached initdb instance?
-
cache_pg_config
= True¶ shall we cache pg_config results?
-
cached_initdb_dir
¶ path to a temp directory for cached initdb.
-
cached_initdb_unique
= False¶ shall we give new node a unique system id?
-
error_log_lines
= 20¶ N of log lines to be shown in exceptions (0=inf).
-
node_cleanup_full
= True¶ shall we remove EVERYTHING (including logs)?
-
node_cleanup_on_bad_exit
= False¶ remove base_dir on __exit__() via exception.
-
node_cleanup_on_good_exit
= True¶ remove base_dir on nominal __exit__().
-
temp_dir
¶ path to temp dir containing nodes with default ‘base_dir’.
-
use_python_logging
= False¶ enable python logging subsystem (see logger.py).
-
-
testgres.config.
configure_testgres
(**options)¶ Adjust current global options. Look at the GlobalConfig to learn about existing settings.
-
testgres.config.
pop_config
()¶ Set previous GlobalConfig options from stack.
-
testgres.config.
push_config
(**options)¶ Permanently set custom GlobalConfig options and put previous settings on top of the config stack.
-
testgres.config.
scoped_config
(**options)¶ Temporarily set custom GlobalConfig options for this context. Previous options are pushed to the config stack.
Example
>>> from .api import get_new_node >>> with scoped_config(cache_initdb=False): ... # create a new node with fresh initdb ... with get_new_node().init().start() as node: ... print(node.execute('select 1')) [(1,)]
testgres.connection¶
-
class
testgres.connection.
NodeConnection
(node, dbname=None, username=None, password=None, autocommit=False)¶ Bases:
object
Transaction wrapper returned by Node
-
begin
(isolation_level=<IsolationLevel.ReadCommitted: 'read committed'>)¶
-
close
()¶
-
commit
()¶
-
connection
¶
-
cursor
¶
-
execute
(query, *args)¶
-
node
¶
-
pid
¶
-
rollback
()¶
-
testgres.enums¶
-
class
testgres.enums.
DumpFormat
¶ Bases:
enum.Enum
Available dump formats
-
Custom
= 'custom'¶
-
Directory
= 'directory'¶
-
Plain
= 'plain'¶
-
Tar
= 'tar'¶
-
-
class
testgres.enums.
IsolationLevel
¶ Bases:
enum.Enum
Transaction isolation level for
NodeConnection
-
ReadCommitted
= 'read committed'¶
-
ReadUncommitted
= 'read uncommitted'¶
-
RepeatableRead
= 'repeatable read'¶
-
Serializable
= 'serializable'¶
-
-
class
testgres.enums.
NodeStatus
¶ Bases:
enum.IntEnum
Status of a PostgresNode
-
Running
= 0¶
-
Stopped
= 1¶
-
Uninitialized
= 2¶
-
-
class
testgres.enums.
ProcessType
¶ Bases:
enum.Enum
Types of processes
-
AutovacuumLauncher
= 'autovacuum launcher'¶
-
BackgroundWriter
= 'background writer'¶
-
Checkpointer
= 'checkpointer'¶
-
LogicalReplicationLauncher
= 'logical replication launcher'¶
-
Startup
= 'startup'¶
-
StatsCollector
= 'stats collector'¶
-
Unknown
= 'unknown'¶
-
WalReceiver
= 'wal receiver'¶
-
WalSender
= 'wal sender'¶
-
WalWriter
= 'wal writer'¶
-
-
class
testgres.enums.
XLogMethod
¶ Bases:
enum.Enum
Available WAL methods for
NodeBackup
-
fetch
= 'fetch'¶
-
none
= 'none'¶
-
stream
= 'stream'¶
-
testgres.exceptions¶
-
exception
testgres.exceptions.
BackupException
¶
-
exception
testgres.exceptions.
CatchUpException
(message=None, query=None)¶
-
exception
testgres.exceptions.
ExecUtilException
(message=None, command=None, exit_code=0, out=None)¶
-
exception
testgres.exceptions.
InitNodeException
¶
-
exception
testgres.exceptions.
QueryException
(message=None, query=None)¶
-
exception
testgres.exceptions.
StartNodeException
(message=None, files=None)¶
-
exception
testgres.exceptions.
TestgresException
¶ Bases:
Exception
-
exception
testgres.exceptions.
TimeoutException
(message=None, query=None)¶
testgres.node¶
-
class
testgres.node.
PostgresNode
(name=None, port=None, base_dir=None)¶ -
__init__
(name=None, port=None, base_dir=None)¶ PostgresNode constructor.
Parameters: - name – node’s application name.
- port – port to accept connections.
- base_dir – path to node’s data directory.
-
append_conf
(line='', filename='postgresql.conf', **kwargs)¶ Append line to a config file.
Parameters: - line – string to be appended to config.
- filename – config file (postgresql.conf by default).
- **kwargs – named config options.
Returns: This instance of
PostgresNode
.Examples
>>> append_conf(fsync=False) >>> append_conf('log_connections = yes') >>> append_conf(random_page_cost=1.5, fsync=True, ...) >>> append_conf('postgresql.conf', 'synchronous_commit = off')
-
auxiliary_pids
¶ Returns a dict of { ProcessType – PID }.
-
auxiliary_processes
¶ Returns a list of auxiliary processes. Each process is represented by
ProcessProxy
object.
-
backup
(**kwargs)¶ Perform pg_basebackup.
Parameters: - username – database user name.
- xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
- base_dir – the base directory for data files and logs
Returns: A smart object of type NodeBackup.
-
catchup
(dbname=None, username=None)¶ Wait until async replica catches up with its master.
-
child_processes
¶ Returns a list of all child processes. Each process is represented by
ProcessProxy
object.
-
cleanup
(max_attempts=3)¶ Stop node if needed and remove its data/logs directory. NOTE: take a look at TestgresConfig.node_cleanup_full.
Parameters: max_attempts – how many times should we try to stop()? Returns: This instance of PostgresNode
.
-
connect
(dbname=None, username=None, password=None, autocommit=False)¶ Connect to a database.
Parameters: - dbname – database name to connect to.
- username – database user name.
- password – user’s password.
- autocommit – commit each statement automatically. Also it should be set to True for statements requiring to be run outside a transaction? such as VACUUM or CREATE DATABASE.
Returns: An instance of
NodeConnection
.
-
default_conf
(fsync=False, unix_sockets=True, allow_streaming=True, allow_logical=False, log_statement='all')¶ Apply default settings to this node.
Parameters: - fsync – should this node use fsync to keep data safe?
- unix_sockets – should we enable UNIX sockets?
- allow_streaming – should this node add a hba entry for replication?
- allow_logical – can this node be used as a logical replication publisher?
- log_statement – one of (‘all’, ‘off’, ‘mod’, ‘ddl’).
Returns: This instance of
PostgresNode
.
-
dump
(filename=None, dbname=None, username=None, format=<DumpFormat.Plain: 'plain'>)¶ Dump database into a file using pg_dump. NOTE: the file is not removed automatically.
Parameters: - filename – database dump taken by pg_dump.
- dbname – database name to connect to.
- username – database user name.
- format – format argument plain/custom/directory/tar.
Returns: Path to a file containing dump.
-
execute
(query, dbname=None, username=None, password=None, commit=True)¶ Execute a query and return all rows as list.
Parameters: - query – query to be executed.
- dbname – database name to connect to.
- username – database user name.
- password – user’s password.
- commit – should we commit this query?
Returns: A list of tuples representing rows.
-
free_port
()¶ Reclaim port owned by this node. NOTE: does not free auto selected ports.
-
get_control_data
()¶ Return contents of pg_control file.
-
init
(initdb_params=None, **kwargs)¶ Perform initdb for this node.
Parameters: - initdb_params – parameters for initdb (list).
- fsync – should this node use fsync to keep data safe?
- unix_sockets – should we enable UNIX sockets?
- allow_streaming – should this node add a hba entry for replication?
Returns: This instance of
PostgresNode
-
pg_ctl
(params)¶ Invoke pg_ctl with params.
Parameters: params – arguments for pg_ctl. Returns: Stdout + stderr of pg_ctl.
-
pgbench
(dbname=None, username=None, stdout=None, stderr=None, options=[])¶ Spawn a pgbench process.
Parameters: - dbname – database name to connect to.
- username – database user name.
- stdout – stdout file to be used by Popen.
- stderr – stderr file to be used by Popen.
- options – additional options for pgbench (list).
Returns: Process created by subprocess.Popen.
-
pgbench_init
(**kwargs)¶ Small wrapper for pgbench_run(). Sets initialize=True.
Returns: This instance of PostgresNode
.
-
pgbench_run
(dbname=None, username=None, options=[], **kwargs)¶ Run pgbench with some options. This event is logged (see self.utils_log_file).
Parameters: - dbname – database name to connect to.
- username – database user name.
- options – additional options for pgbench (list).
- **kwargs – named options for pgbench. Run pgbench –help to learn more.
Returns: Stdout produced by pgbench.
Examples
>>> pgbench_run(initialize=True, scale=2) >>> pgbench_run(time=10)
-
pid
¶ Return postmaster’s PID if node is running, else 0.
-
poll_query_until
(query, dbname=None, username=None, max_attempts=0, sleep_time=1, expected=True, commit=True, suppress=None)¶ Run a query once per second until it returns ‘expected’. Query should return a single value (1 row, 1 column).
Parameters: - query – query to be executed.
- dbname – database name to connect to.
- username – database user name.
- max_attempts – how many times should we try? 0 == infinite
- sleep_time – how much should we sleep after a failure?
- expected – what should be returned to break the cycle?
- commit – should (possible) changes be committed?
- suppress – a collection of exceptions to be suppressed.
Examples
>>> poll_query_until('select true') >>> poll_query_until('postgres', "select now() > '01.01.2018'") >>> poll_query_until('select false', expected=True, max_attempts=4) >>> poll_query_until('select 1', suppress={testgres.OperationalError})
-
promote
(dbname=None, username=None)¶ Promote standby instance to master using pg_ctl. For PostgreSQL versions below 10 some additional actions required to ensure that instance became writable and hence dbname and username parameters may be needed.
Returns: This instance of PostgresNode
.
-
psql
(query=None, filename=None, dbname=None, username=None, input=None, **variables)¶ Execute a query using psql.
Parameters: - query – query to be executed.
- filename – file with a query.
- dbname – database name to connect to.
- username – database user name.
- input – raw input to be passed.
- **variables – vars to be set before execution.
Returns: A tuple of (code, stdout, stderr).
Examples
>>> psql('select 1') >>> psql('postgres', 'select 2') >>> psql(query='select 3', ON_ERROR_STOP=1)
-
publish
(name, **kwargs)¶ Create publication for logical replication
Parameters: - pubname – publication name
- tables – tables names list
- dbname – database name where objects or interest are located
- username – replication username
-
reload
(params=[])¶ Reload config files using pg_ctl.
Parameters: params – additional arguments for pg_ctl. Returns: This instance of PostgresNode
.
-
replicate
(name=None, slot=None, **kwargs)¶ Create a binary replica of this node.
Parameters: - name – replica’s application name.
- slot – create a replication slot with the specified name.
- username – database user name.
- xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
- base_dir – the base directory for data files and logs
-
restart
(params=[])¶ Restart this node using pg_ctl.
Parameters: params – additional arguments for pg_ctl. Returns: This instance of PostgresNode
.
-
restore
(filename, dbname=None, username=None)¶ Restore database from pg_dump’s file.
Parameters: - filename – database dump taken by pg_dump in custom/directory/tar formats.
- dbname – database name to connect to.
- username – database user name.
-
safe_psql
(query=None, **kwargs)¶ Execute a query using psql.
Parameters: - query – query to be executed.
- filename – file with a query.
- dbname – database name to connect to.
- username – database user name.
- input – raw input to be passed.
- are passed to psql() (**kwargs) –
Returns: psql’s output as str.
-
source_walsender
¶ Returns master’s walsender feeding this replica.
-
start
(params=[], wait=True)¶ Start this node using pg_ctl.
Parameters: - params – additional arguments for pg_ctl.
- wait – wait until operation completes.
Returns: This instance of
PostgresNode
.
-
status
()¶ Check this node’s status.
Returns: An instance of NodeStatus
.
-
stop
(params=[], wait=True)¶ Stop this node using pg_ctl.
Parameters: - params – additional arguments for pg_ctl.
- wait – wait until operation completes.
Returns: This instance of
PostgresNode
.
-
subscribe
(publication, name, dbname=None, username=None, **params)¶ Create subscription for logical replication
Parameters: - name – subscription name
- publication – publication object obtained from publish()
- dbname – database name
- username – replication username
- params – subscription parameters (see documentation on CREATE SUBSCRIPTION for details)
-
version
¶ Return PostgreSQL version for this node.
Returns: Instance of distutils.version.LooseVersion
.
-
testgres.pubsub¶
Unlike physical replication the logical replication allows users replicate only
specified databases and tables. It uses publish-subscribe model with possibly
multiple publishers and multiple subscribers. When initializing publisher’s
node allow_logical=True
should be passed to the PostgresNode.init()
method to enable PostgreSQL to write extra information to the WAL needed by
logical replication.
To replicate table X
from node A to node B the same table structure should
be defined on the subscriber’s node as logical replication don’t replicate DDL.
After that publish()
and subscribe()
methods may be used to setup replication. Example:
>>> from testgres import get_new_node
>>> with get_new_node() as nodeA, get_new_node() as nodeB:
... nodeA.init(allow_logical=True).start()
... nodeB.init().start()
...
... # create same table both on publisher and subscriber
... create_table = 'create table test (a int, b int)'
... nodeA.safe_psql(create_table)
... nodeB.safe_psql(create_table)
...
... # create publication
... pub = nodeA.publish('mypub')
... # create subscription
... sub = nodeB.subscribe(pub, 'mysub')
...
... # insert some data to the publisher's node
... nodeA.execute('insert into test values (1, 1), (2, 2)')
...
... # wait until changes apply on subscriber and check them
... sub.catchup()
...
... # read the data from subscriber's node
... nodeB.execute('select * from test')
PostgresNode(name='...', port=..., base_dir='...')
PostgresNode(name='...', port=..., base_dir='...')
''
''
[(1, 1), (2, 2)]
-
class
testgres.node.
Publication
(name, node, tables=None, dbname=None, username=None)¶ -
__init__
(name, node, tables=None, dbname=None, username=None)¶ Constructor. Use
PostgresNode.publish()
instead of direct constructing publication objects.Parameters: - name – publication name.
- node – publisher’s node.
- tables – tables list or None for all tables.
- dbname – database name used to connect and perform subscription.
- username – username used to connect to the database.
-
add_tables
(tables, dbname=None, username=None)¶ Add tables to the publication. Cannot be used if publication was created with empty tables list.
Parameters: tables – a list of tables to be added to the publication.
-
drop
(dbname=None, username=None)¶ Drop publication
-
-
class
testgres.node.
Subscription
(node, publication, name=None, dbname=None, username=None, **params)¶ -
__init__
(node, publication, name=None, dbname=None, username=None, **params)¶ Constructor. Use
PostgresNode.subscribe()
instead of direct constructing subscription objects.Parameters: - name – subscription name.
- node – subscriber’s node.
- publication –
Publication
object we are subscribing to (seePostgresNode.publish()
). - dbname – database name used to connect and perform subscription.
- username – username used to connect to the database.
- params –
subscription parameters (see documentation on CREATE SUBSCRIPTION for details).
-
catchup
(username=None)¶ Wait until subscription catches up with publication.
Parameters: username – remote node’s user name.
-
disable
(dbname=None, username=None)¶ Disables the running subscription.
-
drop
(dbname=None, username=None)¶ Drops subscription
-
enable
(dbname=None, username=None)¶ Enables the previously disabled subscription.
-
refresh
(copy_data=True, dbname=None, username=None)¶ Disables the running subscription.
-