diff --git a/pglogrepl_test.go b/pglogrepl_test.go index 37eebcf..54d4144 100644 --- a/pglogrepl_test.go +++ b/pglogrepl_test.go @@ -414,6 +414,12 @@ func TestBaseBackupManifest(t *testing.T) { require.NoError(t, err) defer closeConn(t, conn) + serverVersion, err := serverMajorVersion(conn) + require.NoError(t, err) + if serverVersion < 15 { + t.Skip("backup manifest framing via CopyData subtypes is PG15+; PG13/14 stream raw tar") + } + manifestData, filesWritten := streamBB(ctx, t, conn, false) require.Greater(t, len(manifestData), 1) require.GreaterOrEqual(t, len(filesWritten), 2) // manifest + base @@ -445,6 +451,9 @@ func TestBaseBackupIncremental(t *testing.T) { t.Skip() } + // incremental requires WAL summarization + requireSummarizeWalOn(ctx, t, conn) + // create basebackup manifestData, filesWritten := streamBB(ctx, t, conn, false) require.Greater(t, len(manifestData), 1) @@ -555,6 +564,10 @@ func streamBB(ctx context.Context, t *testing.T, conn *pgconn.PgConn, incrementa continue case *pgproto3.CopyData: + if len(m.Data) == 0 { + t.Logf("skip empty CopyData") + continue + } switch m.Data[0] { case 'n': // New file header (tar member) @@ -596,7 +609,8 @@ func streamBB(ctx context.Context, t *testing.T, conn *pgconn.PgConn, incrementa default: // unexpected data type – fail fast so we don't spin forever - t.Fatalf("unexpected CopyData message type: %q", m.Data[0]) + t.Logf("skip unexpected CopyData subtype: %q (len=%d)", m.Data[0], len(m.Data)) + continue } case *pgproto3.CopyDone: @@ -651,3 +665,18 @@ func serverMajorVersion(conn *pgconn.PgConn) (int, error) { } return strconv.Atoi(verString[:dot]) } + +func requireSummarizeWalOn(ctx context.Context, t *testing.T, conn *pgconn.PgConn) { + t.Helper() + + rr := conn.Exec(ctx, "SHOW summarize_wal") + res, err := rr.ReadAll() + require.NoError(t, err) + require.NotEmpty(t, res) + require.Greater(t, len(res[0].Rows), 0) + + val := strings.TrimSpace(string(res[0].Rows[0][0])) // first row, first column + if val != "on" { + t.Skipf("summarize_wal=%q; start postgres with -c summarize_wal=on (required for incremental base backups)", val) + } +}