Skip to content

Commit 3a7400f

Browse files
committed
Fix primary_key regression test to align with Spock 5.0 behavior
Updated the test case to reflect current behavior where workers no longer quit on replication errors. With Spock 5.0, subscriptions are now disabled when exception_behaviour = 'SUB_DISABLE'. Also added a helper function to the init script to simplify setup of test scenarios.
1 parent 86d6c13 commit 3a7400f

File tree

5 files changed

+377
-67
lines changed

5 files changed

+377
-67
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ OBJS = spock_jsonb_utils.o spock_exception_handler.o spock_apply.o \
3333

3434
SCRIPTS_built = spock_create_subscriber
3535

36-
# FIXME: primary_key, triggers, row_filter, multiple_upstreams
36+
# FIXME: triggers, row_filter, multiple_upstreams
3737
REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondary_unique \
38-
toasted replication_set matview bidirectional \
38+
toasted replication_set matview bidirectional primary_key \
3939
interfaces foreign_key copy sequence parallel \
4040
row_filter_sampling att_list column_filter apply_delay \
4141
node_origin_cascade drop

expected/init.out

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,74 @@ SELECT count(*) FROM pg_stat_replication;
121121
1
122122
(1 row)
123123

124+
CREATE OR REPLACE FUNCTION fetch_last_xid(action char DEFAULT 'U')
125+
RETURNS xid
126+
LANGUAGE plpgsql
127+
AS $$
128+
DECLARE
129+
remote_xid xid;
130+
i INTEGER;
131+
slot TEXT;
132+
BEGIN
133+
-- Wait up to 10 seconds (100 x 0.1s) for an inactive slot
134+
FOR i IN 1..100 LOOP
135+
IF EXISTS (SELECT 1 FROM pg_replication_slots WHERE active = false) THEN
136+
EXIT;
137+
END IF;
138+
PERFORM pg_sleep(0.1);
139+
END LOOP;
140+
141+
-- Get slot name
142+
SELECT slot_name INTO slot FROM pg_replication_slots LIMIT 1;
143+
144+
-- Fetch the remote xid of last UPDATE ('U') action
145+
SELECT xid
146+
INTO remote_xid
147+
FROM pg_logical_slot_peek_changes(
148+
slot,
149+
NULL,
150+
10,
151+
'min_proto_version', '3',
152+
'max_proto_version', '4',
153+
'startup_params_format', '1',
154+
'proto_format', 'json',
155+
'spock.replication_set_names', 'default,ddl_sql'
156+
) AS changes(lsn, xid, data)
157+
WHERE data IS NOT NULL
158+
AND data <> ''
159+
AND data::json->>'action' = action
160+
LIMIT 1;
161+
162+
RETURN remote_xid;
163+
END;
164+
$$;
165+
\c :subscriber_dsn
166+
CREATE OR REPLACE FUNCTION skiplsn_and_enable_sub(
167+
sub_name text,
168+
p_remote_xid bigint
169+
)
170+
RETURNS void
171+
LANGUAGE plpgsql
172+
AS $$
173+
DECLARE
174+
skiplsn text;
175+
BEGIN
176+
-- Extract skip_lsn from exception_log
177+
SELECT quote_literal((regexp_match(error_message, 'skip_lsn = ([0-9A-F/]+)'))[1])
178+
INTO skiplsn
179+
FROM spock.exception_log
180+
WHERE remote_xid = p_remote_xid
181+
AND operation = 'SUB_DISABLE'
182+
LIMIT 1;
183+
184+
IF skiplsn IS NULL THEN
185+
RAISE EXCEPTION 'skip_lsn not found for remote_xid = %', p_remote_xid;
186+
END IF;
187+
188+
-- Alter subscription to skip the problematic LSN
189+
EXECUTE format('SELECT spock.sub_alter_skiplsn(%L, %s)', sub_name, skiplsn);
190+
191+
-- Re-enable the subscription
192+
EXECUTE format('SELECT spock.sub_enable(%L, true)', sub_name);
193+
END;
194+
$$;

expected/primary_key.out

Lines changed: 146 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,25 @@
11
--PRIMARY KEY
22
SELECT * FROM spock_regress_variables()
33
\gset
4+
-- This is to ensure that the test runs with the correct configuration
5+
\c :provider_dsn
6+
ALTER SYSTEM SET spock.check_all_uc_indexes = true;
7+
ALTER SYSTEM SET spock.exception_behaviour = sub_disable;
8+
SELECT pg_reload_conf();
9+
pg_reload_conf
10+
----------------
11+
t
12+
(1 row)
13+
14+
\c :subscriber_dsn
15+
ALTER SYSTEM SET spock.check_all_uc_indexes = true;
16+
ALTER SYSTEM SET spock.exception_behaviour = sub_disable;
17+
SELECT pg_reload_conf();
18+
pg_reload_conf
19+
----------------
20+
t
21+
(1 row)
22+
423
\c :provider_dsn
524
-- testing update of primary key
625
-- create table with primary key and 3 other tables referencing it
@@ -200,7 +219,7 @@ SELECT * FROM pk_users ORDER BY id;
200219

201220
\c :provider_dsn
202221
\set VERBOSITY terse
203-
SELECT quote_literal(pg_current_xlog_location()) as curr_lsn
222+
SELECT quote_literal(pg_current_wal_lsn()) as curr_lsn
204223
\gset
205224
SELECT spock.replicate_ddl($$
206225
CREATE UNIQUE INDEX id_temp_idx ON public.pk_users (id);
@@ -240,10 +259,10 @@ SELECT attname, attnotnull, attisdropped from pg_attribute where attrelid = 'pk_
240259
address | f | f
241260
(5 rows)
242261

243-
SELECT spock.sub_disable('test_subscription', true);
244-
sub_disable
245-
-------------
246-
t
262+
select status from spock.sub_show_status();
263+
status
264+
----------
265+
disabled
247266
(1 row)
248267

249268
\c :provider_dsn
@@ -260,7 +279,8 @@ BEGIN
260279
END LOOP;
261280
END;
262281
$$;
263-
SELECT data::json->'action' as action, CASE WHEN data::json->>'action' IN ('I', 'D', 'U') THEN json_extract_path(data::json, 'relation') END as data FROM pg_logical_slot_get_changes((SELECT slot_name FROM pg_replication_slots), NULL, 1, 'min_proto_version', '1', 'max_proto_version', '1', 'startup_params_format', '1', 'proto_format', 'json', 'spock.replication_set_names', 'default,ddl_sql');
282+
SELECT data::json->'action' as action, CASE WHEN data::json->>'action' IN ('I', 'D', 'U') THEN json_extract_path(data::json, 'relation') END as data FROM pg_logical_slot_get_changes((SELECT slot_name FROM pg_replication_slots), NULL, 1, 'min_proto_version', '3', 'max_proto_version', '4', 'startup_params_format', '1', 'proto_format', 'json', 'spock.replication_set_names', 'default,ddl_sql')
283+
WHERE data IS NOT NULL AND data <> '';
264284
action | data
265285
--------+-------------------
266286
"S" |
@@ -269,19 +289,20 @@ SELECT data::json->'action' as action, CASE WHEN data::json->>'action' IN ('I',
269289
"C" |
270290
(4 rows)
271291

272-
SELECT data::json->'action' as action, CASE WHEN data::json->>'action' IN ('I', 'D', 'U') THEN data END as data FROM pg_logical_slot_get_changes((SELECT slot_name FROM pg_replication_slots), NULL, 1, 'min_proto_version', '1', 'max_proto_version', '1', 'startup_params_format', '1', 'proto_format', 'json', 'spock.replication_set_names', 'default,ddl_sql');
292+
SELECT data::json->'action' as action, CASE WHEN data::json->>'action' IN ('I', 'D', 'U') THEN data END as data FROM pg_logical_slot_get_changes((SELECT slot_name FROM pg_replication_slots), NULL, 1, 'min_proto_version', '3', 'max_proto_version', '4', 'startup_params_format', '1', 'proto_format', 'json', 'spock.replication_set_names', 'default,ddl_sql')
293+
WHERE data IS NOT NULL AND data <> '';
273294
action | data
274295
--------+------
275296
(0 rows)
276297

277298
\c :subscriber_dsn
299+
DELETE FROM pk_users WHERE id = 4;-- remove the offending entries.
278300
SELECT spock.sub_enable('test_subscription', true);
279301
sub_enable
280302
------------
281303
t
282304
(1 row)
283305

284-
DELETE FROM pk_users WHERE id = 4;-- remove the offending entries.
285306
\c :provider_dsn
286307
DO $$
287308
BEGIN
@@ -352,27 +373,26 @@ SELECT * FROM spock.repset_remove_table('default', 'pk_users');
352373
SELECT * FROM spock.repset_add_table('default', 'pk_users');
353374
ERROR: table pk_users cannot be added to replication set default
354375
ROLLBACK;
355-
-- Per 2ndQuadrant/spock_internal#146 this shouldn't be allowed, but
356-
-- currently is. Logical decoding will fail to capture this change and we
357-
-- won't progress with decoding.
358-
--
359-
-- This will get recorded by logical decoding with no 'oldkey' values,
360-
-- causing spock to fail to apply it with an error like
361376
--
362-
-- CONFLICT: remote UPDATE on relation public.pk_users (tuple not found). Resolution: skip.
377+
-- Previously this would cause an error on the subscriber:
378+
-- remote UPDATE on relation public.pk_users (tuple not found).
379+
-- In Spock's, this will be caught and the subscription will be disabled when
380+
-- spock.exception_behaviour = sub_disable is set.
363381
--
364382
UPDATE pk_users SET id = 91 WHERE id = 90;
365-
-- Catchup will replay the insert and succeed, but the update
366-
-- will be lost.
367-
BEGIN;
368-
SET LOCAL statement_timeout = '2s';
369-
SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
370-
ERROR: canceling statement due to statement timeout
371-
ROLLBACK;
383+
-- feth xid of the last 'U' action
384+
SELECT fetch_last_xid('U') AS remote_xid \gset
372385
-- To carry on we'll need to make the index on the downstream
373-
-- (which is odd, because logical decoding didn't capture the
374-
-- oldkey of the tuple, so how can we apply it?)
386+
-- We need to skip the last 'U' action that caused the subscription to be disabled.
387+
-- Its done by altering the subscription to skip the last 'U' action and the error
388+
-- message from the exception log is used to find the skip_lsn.
375389
\c :subscriber_dsn
390+
SELECT skiplsn_and_enable_sub('test_subscription', :remote_xid);
391+
skiplsn_and_enable_sub
392+
------------------------
393+
394+
(1 row)
395+
376396
ALTER TABLE public.pk_users
377397
ADD CONSTRAINT pk_users_pkey PRIMARY KEY (id) NOT DEFERRABLE;
378398
\c :provider_dsn
@@ -439,30 +459,26 @@ SELECT id FROM pk_users WHERE id IN (90, 91, 100, 101) ORDER BY id;
439459
100
440460
(2 rows)
441461

462+
SELECT spock.sub_show_status();
463+
sub_show_status
464+
---------------------------------------------------------------------------------------------------------------------------------------------------------------
465+
(test_subscription,disabled,test_provider,"dbname=regression user=super",spk_postgres_test_provider_test_sube55bf37,"{default,default_insert_only,ddl_sql}",)
466+
(1 row)
467+
442468
-- we can recover by re-creating the pk as non-deferrable
443469
ALTER TABLE public.pk_users DROP CONSTRAINT pk_users_pkey,
444470
ADD CONSTRAINT pk_users_pkey PRIMARY KEY (id) NOT DEFERRABLE;
445471
-- then replay. Toggle the subscription's enabled state
446472
-- to make it recover faster for a quicker test run.
447-
SELECT spock.sub_disable('test_subscription', true);
448-
sub_disable
449-
-------------
450-
t
451-
(1 row)
452-
473+
-- sub is already disabled due to the previous
453474
SELECT spock.sub_enable('test_subscription', true);
454475
sub_enable
455476
------------
456477
t
457478
(1 row)
458479

459-
\c :provider_dsn
460-
SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
461-
wait_slot_confirm_lsn
462-
-----------------------
463-
464-
(1 row)
465-
480+
-- \c :provider_dsn
481+
-- SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
466482
\c :subscriber_dsn
467483
SELECT id FROM pk_users WHERE id IN (90, 91, 100, 101) ORDER BY id;
468484
id
@@ -481,6 +497,20 @@ SELECT id FROM pk_users WHERE id IN (90, 91, 100, 101) ORDER BY id;
481497
101
482498
(2 rows)
483499

500+
\c :provider_dsn
501+
-- feth xid of the last 'U' action
502+
SELECT fetch_last_xid('U') AS remote_xid \gset
503+
-- To carry on we'll need to make the index on the downstream
504+
-- We need to skip the last 'U' action that caused the subscription to be disabled.
505+
-- Its done by altering the subscription to skip the last 'U' action and the error
506+
-- message from the exception log is used to find the skip_lsn.
507+
\c :subscriber_dsn
508+
SELECT skiplsn_and_enable_sub('test_subscription', :remote_xid);
509+
skiplsn_and_enable_sub
510+
------------------------
511+
512+
(1 row)
513+
484514
-- Demonstrate that we properly handle wide conflict rows
485515
\c :subscriber_dsn
486516
INSERT INTO pk_users (id, another_id, address)
@@ -495,6 +525,17 @@ SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
495525
(1 row)
496526

497527
\c :subscriber_dsn
528+
-- wait until subscription status updated.
529+
DO $$
530+
BEGIN
531+
FOR i IN 1..100 LOOP
532+
IF (SELECT count(1) FROM spock.subscription WHERE sub_enabled = true) THEN
533+
RETURN;
534+
END IF;
535+
PERFORM pg_sleep(0.1);
536+
END LOOP;
537+
END;
538+
$$;
498539
SELECT id, another_id, left(address,30) AS address_abbrev
499540
FROM pk_users WHERE another_id = 2000;
500541
id | another_id | address_abbrev
@@ -574,20 +615,51 @@ SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
574615

575616
(1 row)
576617

618+
\c :subscriber_dsn
619+
SELECT spock.sub_show_status();
620+
sub_show_status
621+
------------------------------------------------------------------------------------------------------------------------------------------------------------------
622+
(test_subscription,replicating,test_provider,"dbname=regression user=super",spk_postgres_test_provider_test_sube55bf37,"{default,default_insert_only,ddl_sql}",)
623+
(1 row)
624+
625+
SELECT * FROM pk_users ORDER BY id;
626+
id | another_id | a_id | name | address
627+
----+------------+------+--------------+---------
628+
1 | 10 | 1 | should_error |
629+
2 | 20 | 0 | sub |
630+
3 | 11 | 1 | pub |
631+
4 | 22 | 1 | pub |
632+
(4 rows)
633+
577634
-- UPDATEs to missing rows could either resurrect the row or conclude it
578635
-- shouldn't exist and discard it. Currently spk unconditionally discards, so
579636
-- this row's name is a misnomer.
580637
\c :subscriber_dsn
638+
SELECT spock.sub_show_status();
639+
sub_show_status
640+
------------------------------------------------------------------------------------------------------------------------------------------------------------------
641+
(test_subscription,replicating,test_provider,"dbname=regression user=super",spk_postgres_test_provider_test_sube55bf37,"{default,default_insert_only,ddl_sql}",)
642+
(1 row)
643+
581644
DELETE FROM pk_users WHERE id = 4 AND another_id = 22;
582645
\c :provider_dsn
583646
UPDATE pk_users SET name = 'jesus' WHERE id = 4;
584-
SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
585-
wait_slot_confirm_lsn
586-
-----------------------
647+
-- fetch xid of the last 'U' action
648+
SELECT fetch_last_xid('U') AS remote_xid \gset
649+
--SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
650+
\c :subscriber_dsn
651+
SELECT spock.sub_show_status(); -- SUB is disabled due to the previous UPDATE
652+
sub_show_status
653+
---------------------------------------------------------------------------------------------------------------------------------------------------------------
654+
(test_subscription,disabled,test_provider,"dbname=regression user=super",spk_postgres_test_provider_test_sube55bf37,"{default,default_insert_only,ddl_sql}",)
655+
(1 row)
656+
657+
SELECT skiplsn_and_enable_sub('test_subscription', :remote_xid);
658+
skiplsn_and_enable_sub
659+
------------------------
587660

588661
(1 row)
589662

590-
\c :subscriber_dsn
591663
-- No resurrection here
592664
SELECT * FROM pk_users ORDER BY id;
593665
id | another_id | a_id | name | address
@@ -626,7 +698,15 @@ SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
626698
ERROR: canceling statement due to statement timeout
627699
ROLLBACK;
628700
-- This time we'll fix it by deleting the conflicting row
701+
-- Since SUB is disabled, DELETE the row on the subscriber
702+
-- and then re-enable the subscription.
629703
\c :subscriber_dsn
704+
SELECT spock.sub_show_status();
705+
sub_show_status
706+
---------------------------------------------------------------------------------------------------------------------------------------------------------------
707+
(test_subscription,disabled,test_provider,"dbname=regression user=super",spk_postgres_test_provider_test_sube55bf37,"{default,default_insert_only,ddl_sql}",)
708+
(1 row)
709+
630710
SELECT * FROM pk_users ORDER BY id;
631711
id | another_id | a_id | name | address
632712
----+------------+------+--------------+---------
@@ -638,6 +718,12 @@ SELECT * FROM pk_users ORDER BY id;
638718
(5 rows)
639719

640720
DELETE FROM pk_users WHERE id = 5;
721+
SELECT spock.sub_enable('test_subscription', true);
722+
sub_enable
723+
------------
724+
t
725+
(1 row)
726+
641727
\c :provider_dsn
642728
SELECT spock.wait_slot_confirm_lsn(NULL, NULL);
643729
wait_slot_confirm_lsn
@@ -666,3 +752,22 @@ NOTICE: drop cascades to table public.pk_users membership in replication set de
666752
t
667753
(1 row)
668754

755+
-- Reset the configuration to the default value
756+
\c :provider_dsn
757+
ALTER SYSTEM SET spock.check_all_uc_indexes = false;
758+
ALTER SYSTEM SET spock.exception_behaviour = transdiscard;
759+
SELECT pg_reload_conf();
760+
pg_reload_conf
761+
----------------
762+
t
763+
(1 row)
764+
765+
\c :subscriber_dsn
766+
ALTER SYSTEM SET spock.check_all_uc_indexes = false;
767+
ALTER SYSTEM SET spock.exception_behaviour = transdiscard;
768+
SELECT pg_reload_conf();
769+
pg_reload_conf
770+
----------------
771+
t
772+
(1 row)
773+

0 commit comments

Comments
 (0)