testgres package¶
testgres.api¶
Testing framework for PostgreSQL and its extensions
This module was created under influence of Postgres TAP test feature (PostgresNode.pm module). It can manage Postgres clusters: initialize, edit configuration files, start/stop cluster, execute queries. The typical flow may look like:
>>> with get_new_node() as node:
... node.init().start()
... result = node.safe_psql('postgres', 'select 1')
... print(result.decode('utf-8').strip())
... node.stop()
PostgresNode(name='...', port=..., base_dir='...')
1
PostgresNode(name='...', port=..., base_dir='...')
Or:
>>> with get_new_node() as master:
... master.init().start()
... with master.backup() as backup:
... with backup.spawn_replica() as replica:
... replica = replica.start()
... master.execute('postgres', 'create table test (val int4)')
... master.execute('postgres', 'insert into test values (0), (1), (2)')
... replica.catchup() # wait until changes are visible
... print(replica.execute('postgres', 'select count(*) from test'))
PostgresNode(name='...', port=..., base_dir='...')
[(3,)]
-
testgres.api.get_new_node(name=None, base_dir=None, **kwargs)¶ Simply a wrapper around
PostgresNodeconstructor. SeePostgresNode.__init__()for details.
testgres.backup¶
-
class
testgres.backup.NodeBackup(node, base_dir=None, username=None, xlog_method=<XLogMethod.fetch: 'fetch'>)¶ Bases:
objectSmart object responsible for backups
-
cleanup()¶ Remove all files that belong to this backup. No-op if it’s been converted to a PostgresNode (destroy=True).
-
log_file¶
-
spawn_primary(name=None, destroy=True)¶ Create a primary node from a backup.
Parameters: - name – primary’s application name.
- destroy – should we convert this backup into a node?
Returns: New instance of
PostgresNode.
-
spawn_replica(name=None, destroy=True, slot=None)¶ Create a replica of the original node from a backup.
Parameters: - name – replica’s application name.
- slot – create a replication slot with the specified name.
- destroy – should we convert this backup into a node?
Returns: New instance of
PostgresNode.
-
testgres.config¶
-
class
testgres.config.GlobalConfig(**options)¶ Bases:
objectGlobal configuration object which allows user to override default settings.
-
copy()¶ Return a copy of this object.
-
items()¶ Return setting-value pairs.
-
keys()¶ Return a list of all available settings.
-
update(config)¶ Extract setting-value pairs from ‘config’ and assign those values to corresponding settings of this GlobalConfig object.
-
cache_initdb= True¶ shall we use cached initdb instance?
-
cache_pg_config= True¶ shall we cache pg_config results?
-
cached_initdb_dir¶ path to a temp directory for cached initdb.
-
cached_initdb_unique= False¶ shall we give new node a unique system id?
-
error_log_lines= 20¶ N of log lines to be shown in exceptions (0=inf).
-
node_cleanup_full= True¶ shall we remove EVERYTHING (including logs)?
-
node_cleanup_on_bad_exit= False¶ remove base_dir on __exit__() via exception.
-
node_cleanup_on_good_exit= True¶ remove base_dir on nominal __exit__().
-
temp_dir¶ path to temp dir containing nodes with default ‘base_dir’.
-
use_python_logging= False¶ enable python logging subsystem (see logger.py).
-
-
testgres.config.configure_testgres(**options)¶ Adjust current global options. Look at the GlobalConfig to learn about existing settings.
-
testgres.config.pop_config()¶ Set previous GlobalConfig options from stack.
-
testgres.config.push_config(**options)¶ Permanently set custom GlobalConfig options and put previous settings on top of the config stack.
-
testgres.config.scoped_config(**options)¶ Temporarily set custom GlobalConfig options for this context. Previous options are pushed to the config stack.
Example
>>> from .api import get_new_node >>> with scoped_config(cache_initdb=False): ... # create a new node with fresh initdb ... with get_new_node().init().start() as node: ... print(node.execute('select 1')) [(1,)]
testgres.connection¶
-
class
testgres.connection.NodeConnection(node, dbname=None, username=None, password=None, autocommit=False)¶ Bases:
objectTransaction wrapper returned by Node
-
begin(isolation_level=<IsolationLevel.ReadCommitted: 'read committed'>)¶
-
close()¶
-
commit()¶
-
connection¶
-
cursor¶
-
execute(query, *args)¶
-
node¶
-
pid¶
-
rollback()¶
-
testgres.enums¶
-
class
testgres.enums.DumpFormat¶ Bases:
enum.EnumAvailable dump formats
-
Custom= 'custom'¶
-
Directory= 'directory'¶
-
Plain= 'plain'¶
-
Tar= 'tar'¶
-
-
class
testgres.enums.IsolationLevel¶ Bases:
enum.EnumTransaction isolation level for
NodeConnection-
ReadCommitted= 'read committed'¶
-
ReadUncommitted= 'read uncommitted'¶
-
RepeatableRead= 'repeatable read'¶
-
Serializable= 'serializable'¶
-
-
class
testgres.enums.NodeStatus¶ Bases:
enum.IntEnumStatus of a PostgresNode
-
Running= 0¶
-
Stopped= 1¶
-
Uninitialized= 2¶
-
-
class
testgres.enums.ProcessType¶ Bases:
enum.EnumTypes of processes
-
AutovacuumLauncher= 'autovacuum launcher'¶
-
BackgroundWriter= 'background writer'¶
-
Checkpointer= 'checkpointer'¶
-
LogicalReplicationLauncher= 'logical replication launcher'¶
-
Startup= 'startup'¶
-
StatsCollector= 'stats collector'¶
-
Unknown= 'unknown'¶
-
WalReceiver= 'wal receiver'¶
-
WalSender= 'wal sender'¶
-
WalWriter= 'wal writer'¶
-
-
class
testgres.enums.XLogMethod¶ Bases:
enum.EnumAvailable WAL methods for
NodeBackup-
fetch= 'fetch'¶
-
none= 'none'¶
-
stream= 'stream'¶
-
testgres.exceptions¶
-
exception
testgres.exceptions.BackupException¶
-
exception
testgres.exceptions.CatchUpException(message=None, query=None)¶
-
exception
testgres.exceptions.ExecUtilException(message=None, command=None, exit_code=0, out=None)¶
-
exception
testgres.exceptions.InitNodeException¶
-
exception
testgres.exceptions.QueryException(message=None, query=None)¶
-
exception
testgres.exceptions.StartNodeException(message=None, files=None)¶
-
exception
testgres.exceptions.TestgresException¶ Bases:
Exception
-
exception
testgres.exceptions.TimeoutException(message=None, query=None)¶
testgres.node¶
-
class
testgres.node.PostgresNode(name=None, port=None, base_dir=None)¶ -
__init__(name=None, port=None, base_dir=None)¶ PostgresNode constructor.
Parameters: - name – node’s application name.
- port – port to accept connections.
- base_dir – path to node’s data directory.
-
append_conf(line='', filename='postgresql.conf', **kwargs)¶ Append line to a config file.
Parameters: - line – string to be appended to config.
- filename – config file (postgresql.conf by default).
- **kwargs – named config options.
Returns: This instance of
PostgresNode.Examples
>>> append_conf(fsync=False) >>> append_conf('log_connections = yes') >>> append_conf(random_page_cost=1.5, fsync=True, ...) >>> append_conf('postgresql.conf', 'synchronous_commit = off')
-
auxiliary_pids¶ Returns a dict of { ProcessType – PID }.
-
auxiliary_processes¶ Returns a list of auxiliary processes. Each process is represented by
ProcessProxyobject.
-
backup(**kwargs)¶ Perform pg_basebackup.
Parameters: - username – database user name.
- xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
- base_dir – the base directory for data files and logs
Returns: A smart object of type NodeBackup.
-
catchup(dbname=None, username=None)¶ Wait until async replica catches up with its master.
-
child_processes¶ Returns a list of all child processes. Each process is represented by
ProcessProxyobject.
-
cleanup(max_attempts=3)¶ Stop node if needed and remove its data/logs directory. NOTE: take a look at TestgresConfig.node_cleanup_full.
Parameters: max_attempts – how many times should we try to stop()? Returns: This instance of PostgresNode.
-
connect(dbname=None, username=None, password=None, autocommit=False)¶ Connect to a database.
Parameters: - dbname – database name to connect to.
- username – database user name.
- password – user’s password.
- autocommit – commit each statement automatically. Also it should be set to True for statements requiring to be run outside a transaction? such as VACUUM or CREATE DATABASE.
Returns: An instance of
NodeConnection.
-
default_conf(fsync=False, unix_sockets=True, allow_streaming=True, allow_logical=False, log_statement='all')¶ Apply default settings to this node.
Parameters: - fsync – should this node use fsync to keep data safe?
- unix_sockets – should we enable UNIX sockets?
- allow_streaming – should this node add a hba entry for replication?
- allow_logical – can this node be used as a logical replication publisher?
- log_statement – one of (‘all’, ‘off’, ‘mod’, ‘ddl’).
Returns: This instance of
PostgresNode.
-
dump(filename=None, dbname=None, username=None, format=<DumpFormat.Plain: 'plain'>)¶ Dump database into a file using pg_dump. NOTE: the file is not removed automatically.
Parameters: - filename – database dump taken by pg_dump.
- dbname – database name to connect to.
- username – database user name.
- format – format argument plain/custom/directory/tar.
Returns: Path to a file containing dump.
-
execute(query, dbname=None, username=None, password=None, commit=True)¶ Execute a query and return all rows as list.
Parameters: - query – query to be executed.
- dbname – database name to connect to.
- username – database user name.
- password – user’s password.
- commit – should we commit this query?
Returns: A list of tuples representing rows.
-
free_port()¶ Reclaim port owned by this node. NOTE: does not free auto selected ports.
-
get_control_data()¶ Return contents of pg_control file.
-
init(initdb_params=None, **kwargs)¶ Perform initdb for this node.
Parameters: - initdb_params – parameters for initdb (list).
- fsync – should this node use fsync to keep data safe?
- unix_sockets – should we enable UNIX sockets?
- allow_streaming – should this node add a hba entry for replication?
Returns: This instance of
PostgresNode
-
pg_ctl(params)¶ Invoke pg_ctl with params.
Parameters: params – arguments for pg_ctl. Returns: Stdout + stderr of pg_ctl.
-
pgbench(dbname=None, username=None, stdout=None, stderr=None, options=[])¶ Spawn a pgbench process.
Parameters: - dbname – database name to connect to.
- username – database user name.
- stdout – stdout file to be used by Popen.
- stderr – stderr file to be used by Popen.
- options – additional options for pgbench (list).
Returns: Process created by subprocess.Popen.
-
pgbench_init(**kwargs)¶ Small wrapper for pgbench_run(). Sets initialize=True.
Returns: This instance of PostgresNode.
-
pgbench_run(dbname=None, username=None, options=[], **kwargs)¶ Run pgbench with some options. This event is logged (see self.utils_log_file).
Parameters: - dbname – database name to connect to.
- username – database user name.
- options – additional options for pgbench (list).
- **kwargs – named options for pgbench. Run pgbench –help to learn more.
Returns: Stdout produced by pgbench.
Examples
>>> pgbench_run(initialize=True, scale=2) >>> pgbench_run(time=10)
-
pid¶ Return postmaster’s PID if node is running, else 0.
-
poll_query_until(query, dbname=None, username=None, max_attempts=0, sleep_time=1, expected=True, commit=True, suppress=None)¶ Run a query once per second until it returns ‘expected’. Query should return a single value (1 row, 1 column).
Parameters: - query – query to be executed.
- dbname – database name to connect to.
- username – database user name.
- max_attempts – how many times should we try? 0 == infinite
- sleep_time – how much should we sleep after a failure?
- expected – what should be returned to break the cycle?
- commit – should (possible) changes be committed?
- suppress – a collection of exceptions to be suppressed.
Examples
>>> poll_query_until('select true') >>> poll_query_until('postgres', "select now() > '01.01.2018'") >>> poll_query_until('select false', expected=True, max_attempts=4) >>> poll_query_until('select 1', suppress={testgres.OperationalError})
-
promote(dbname=None, username=None)¶ Promote standby instance to master using pg_ctl. For PostgreSQL versions below 10 some additional actions required to ensure that instance became writable and hence dbname and username parameters may be needed.
Returns: This instance of PostgresNode.
-
psql(query=None, filename=None, dbname=None, username=None, input=None, **variables)¶ Execute a query using psql.
Parameters: - query – query to be executed.
- filename – file with a query.
- dbname – database name to connect to.
- username – database user name.
- input – raw input to be passed.
- **variables – vars to be set before execution.
Returns: A tuple of (code, stdout, stderr).
Examples
>>> psql('select 1') >>> psql('postgres', 'select 2') >>> psql(query='select 3', ON_ERROR_STOP=1)
-
publish(name, **kwargs)¶ Create publication for logical replication
Parameters: - pubname – publication name
- tables – tables names list
- dbname – database name where objects or interest are located
- username – replication username
-
reload(params=[])¶ Reload config files using pg_ctl.
Parameters: params – additional arguments for pg_ctl. Returns: This instance of PostgresNode.
-
replicate(name=None, slot=None, **kwargs)¶ Create a binary replica of this node.
Parameters: - name – replica’s application name.
- slot – create a replication slot with the specified name.
- username – database user name.
- xlog_method – a method for collecting the logs (‘fetch’ | ‘stream’).
- base_dir – the base directory for data files and logs
-
restart(params=[])¶ Restart this node using pg_ctl.
Parameters: params – additional arguments for pg_ctl. Returns: This instance of PostgresNode.
-
restore(filename, dbname=None, username=None)¶ Restore database from pg_dump’s file.
Parameters: - filename – database dump taken by pg_dump in custom/directory/tar formats.
- dbname – database name to connect to.
- username – database user name.
-
safe_psql(query=None, **kwargs)¶ Execute a query using psql.
Parameters: - query – query to be executed.
- filename – file with a query.
- dbname – database name to connect to.
- username – database user name.
- input – raw input to be passed.
- are passed to psql() (**kwargs) –
Returns: psql’s output as str.
-
source_walsender¶ Returns master’s walsender feeding this replica.
-
start(params=[], wait=True)¶ Start this node using pg_ctl.
Parameters: - params – additional arguments for pg_ctl.
- wait – wait until operation completes.
Returns: This instance of
PostgresNode.
-
status()¶ Check this node’s status.
Returns: An instance of NodeStatus.
-
stop(params=[], wait=True)¶ Stop this node using pg_ctl.
Parameters: - params – additional arguments for pg_ctl.
- wait – wait until operation completes.
Returns: This instance of
PostgresNode.
-
subscribe(publication, name, dbname=None, username=None, **params)¶ Create subscription for logical replication
Parameters: - name – subscription name
- publication – publication object obtained from publish()
- dbname – database name
- username – replication username
- params – subscription parameters (see documentation on CREATE SUBSCRIPTION for details)
-
version¶ Return PostgreSQL version for this node.
Returns: Instance of distutils.version.LooseVersion.
-
testgres.pubsub¶
Unlike physical replication the logical replication allows users replicate only
specified databases and tables. It uses publish-subscribe model with possibly
multiple publishers and multiple subscribers. When initializing publisher’s
node allow_logical=True should be passed to the PostgresNode.init()
method to enable PostgreSQL to write extra information to the WAL needed by
logical replication.
To replicate table X from node A to node B the same table structure should
be defined on the subscriber’s node as logical replication don’t replicate DDL.
After that publish() and subscribe()
methods may be used to setup replication. Example:
>>> from testgres import get_new_node
>>> with get_new_node() as nodeA, get_new_node() as nodeB:
... nodeA.init(allow_logical=True).start()
... nodeB.init().start()
...
... # create same table both on publisher and subscriber
... create_table = 'create table test (a int, b int)'
... nodeA.safe_psql(create_table)
... nodeB.safe_psql(create_table)
...
... # create publication
... pub = nodeA.publish('mypub')
... # create subscription
... sub = nodeB.subscribe(pub, 'mysub')
...
... # insert some data to the publisher's node
... nodeA.execute('insert into test values (1, 1), (2, 2)')
...
... # wait until changes apply on subscriber and check them
... sub.catchup()
...
... # read the data from subscriber's node
... nodeB.execute('select * from test')
PostgresNode(name='...', port=..., base_dir='...')
PostgresNode(name='...', port=..., base_dir='...')
''
''
[(1, 1), (2, 2)]
-
class
testgres.node.Publication(name, node, tables=None, dbname=None, username=None)¶ -
__init__(name, node, tables=None, dbname=None, username=None)¶ Constructor. Use
PostgresNode.publish()instead of direct constructing publication objects.Parameters: - name – publication name.
- node – publisher’s node.
- tables – tables list or None for all tables.
- dbname – database name used to connect and perform subscription.
- username – username used to connect to the database.
-
add_tables(tables, dbname=None, username=None)¶ Add tables to the publication. Cannot be used if publication was created with empty tables list.
Parameters: tables – a list of tables to be added to the publication.
-
drop(dbname=None, username=None)¶ Drop publication
-
-
class
testgres.node.Subscription(node, publication, name=None, dbname=None, username=None, **params)¶ -
__init__(node, publication, name=None, dbname=None, username=None, **params)¶ Constructor. Use
PostgresNode.subscribe()instead of direct constructing subscription objects.Parameters: - name – subscription name.
- node – subscriber’s node.
- publication –
Publicationobject we are subscribing to (seePostgresNode.publish()). - dbname – database name used to connect and perform subscription.
- username – username used to connect to the database.
- params –
subscription parameters (see documentation on CREATE SUBSCRIPTION for details).
-
catchup(username=None)¶ Wait until subscription catches up with publication.
Parameters: username – remote node’s user name.
-
disable(dbname=None, username=None)¶ Disables the running subscription.
-
drop(dbname=None, username=None)¶ Drops subscription
-
enable(dbname=None, username=None)¶ Enables the previously disabled subscription.
-
refresh(copy_data=True, dbname=None, username=None)¶ Disables the running subscription.
-