# Copyright (c) 2021-2023, PostgreSQL Global Development Group # Test streaming of simple large transaction use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); $node_publisher->append_conf('postgresql.conf', 'logical_replication_mode = immediate'); $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->append_conf('postgresql.conf', "log_min_messages = DEBUG1"); $node_subscriber->start; $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab_2"); my $appname = 'tap_sub'; ################################ # Test using streaming mode 'on' ################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = parallel)" ); # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); $node_publisher->wait_for_catchup($appname); sleep 3; $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5) s(i)"); # Test serializing changes to files and notify the parallel apply worker to # apply them at the end of the transaction. $node_subscriber->append_conf('postgresql.conf', 'logical_replication_mode = immediate'); # Reset the log_min_messages to default. $node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning"); $node_subscriber->reload; # Run a query to make sure that the reload has taken effect. $node_subscriber->safe_psql('postgres', q{SELECT 1}); my $offset = -s $node_subscriber->logfile; $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5) s(i)"); # Ensure that the changes are serialized. $node_subscriber->wait_for_log( qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/, $offset); $node_subscriber->stop; $node_publisher->stop; done_testing();