Skip to content

Commit c540823

Browse files
authored
Merge pull request #23 from liubao68/work
support logic replication
2 parents 88e6767 + 026df1c commit c540823

File tree

6 files changed

+32
-77
lines changed

6 files changed

+32
-77
lines changed

README.md

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -341,50 +341,17 @@ connection.createStatement("SELECT show_cities_multiple()").execute()
341341

342342
## Logical Decode
343343

344-
> This feature not implemented yet
345-
346-
PostgreSQL allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data.
347-
In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.
348-
349-
Consuming the replication stream is a four-step process:
350-
351-
1. Obtain a replication connection via `PostgresqlConnectionFactory.replication()`.
352-
2. Create a replication slot (physical/logical).
353-
3. Initiate replication using the replication slot.
354-
4. Once the replication stream is set up, you can consume and map the binary data using `ReplicationStream.map(…)`.
355-
356-
On application shutdown, `close()` the `ReplicationStream`.
357-
358-
Note that a connection is busy once the replication is active and a connection can have at most one active replication stream.
359-
360-
```java
361-
Mono<PostgresqlReplicationConnection> replicationMono = connectionFactory.replication();
362-
363-
// later:
364-
ReplicationSlotRequest request = ReplicationSlotRequest.logical()
365-
.slotName("my_slot")
366-
.outputPlugin("test_decoding")
367-
.temporary()
368-
.build();
369-
Mono<ReplicationSlot> createSlot = replicationConnection.createSlot(request);
370-
371-
ReplicationRequest replicationRequest = ReplicationRequest.logical()
372-
.slotName("my_slot")
373-
.startPosition(LogSequenceNumber.valueOf(0))
374-
.slotOption("skip-empty-xacts", true)
375-
.slotOption("include-xids", false)
376-
.build();
377-
378-
Flux<T> replicationStream = replicationConnection.startReplication(replicationRequest).flatMapMany(it -> {
379-
return it.map(byteBuf -> {…})
380-
.doOnError(t -> it.close().subscribe());
381-
});
382-
```
344+
GaussDB allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data.
345+
In GaussDB, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level,
346+
into an application-specific form such as a stream of tuples or SQL statements.
347+
348+
see https://github.com/HuaweiCloudDeveloper/gaussdb-r2dbc-examples for Logical Decode examples.
383349

384350
## GaussDB Enum Types
385351

386-
Applications may make use of Postgres enumerated types by using `EnumCodec` to map custom types to Java `enum` types.
387-
`EnumCodec` requires the Postgres OID and the Java to map enum values to the Postgres protocol and to materialize Enum instances from Postgres results.
352+
Applications may make use of GaussDB enumerated types by using `EnumCodec` to map custom types to Java `enum` types.
353+
`EnumCodec` requires the GaussDB OID and the Java to map enum values to the GaussDB protocol and to materialize Enum instances
354+
from GaussDB results.
388355
You can configure a `CodecRegistrar` through `EnumCodec.builder()` for one or more enumeration type mappings. Make sure to use different Java enum types otherwise the driver is not able to distinguish between Postgres OIDs.
389356

390357
Example:
@@ -406,7 +373,7 @@ enum MyEnumType {
406373
**Codec Registration:**
407374

408375
```java
409-
PostgresqlConnectionConfiguration.builder()
376+
GaussDBConnectionConfiguration.builder()
410377
.codecRegistrar(EnumCodec.builder().withEnum("my_enum",MyEnumType.class).build());
411378
```
412379

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,6 @@
264264
<version>3.12.1</version>
265265
<configuration>
266266
<compilerArgs>
267-
<arg>-Werror</arg>
268267
<arg>-Xlint:all</arg>
269268
<arg>-Xlint:-deprecation</arg>
270269
<arg>-Xlint:-options</arg>

src/main/java/io/r2dbc/gaussdb/GaussDBReplicationStream.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,11 @@ private boolean processKeepAliveMessage(ByteBuf buffer) {
141141
this.lastReceiveLSN = this.lastServerLSN;
142142
}
143143

144-
long lastServerClock = buffer.readLong();
145-
boolean replyRequired = buffer.readByte() != 0;
144+
buffer.readInt(); // serverMode
145+
buffer.readInt(); // dbState
146+
buffer.readLong(); // lastServerClock
146147

147-
return replyRequired;
148+
return buffer.readByte() != 0;
148149
}
149150

150151
private void processXLogData(ByteBuf buffer) {

src/main/java/io/r2dbc/gaussdb/KeepAliveMessage.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,13 @@ public ByteBuf encode(ByteBufAllocator allocator) {
5555
ByteBuf out = allocator.buffer(34);
5656

5757
out.writeByte(KEEP_ALIVE_REPLY);
58+
out.writeLong(Long.MAX_VALUE);
5859
out.writeLong(this.received.asLong());
5960
out.writeLong(this.flushed.asLong());
61+
out.writeLong(Long.MAX_VALUE);
6062
out.writeLong(this.applied.asLong());
63+
out.writeLong(Long.MAX_VALUE);
64+
out.writeLong(Long.MAX_VALUE);
6165
out.writeLong(this.systemClock);
6266

6367
if (this.replyRequired) {
@@ -66,6 +70,11 @@ public ByteBuf encode(ByteBufAllocator allocator) {
6670
out.writeByte(this.received == LogSequenceNumber.INVALID_LSN ? (byte) REPLY_REQUIRED : (byte) NO_REPLY_REQUIRED);
6771
}
6872

73+
out.writeInt(0);
74+
out.writeByte(1);
75+
out.writeByte(1);
76+
out.writeByte(1);
77+
6978
return out;
7079
}
7180

src/main/java/io/r2dbc/gaussdb/replication/ReplicationSlotRequest.java

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,9 @@ public abstract class ReplicationSlotRequest {
2828

2929
private final String slotName;
3030

31-
private final boolean temporary;
32-
33-
ReplicationSlotRequest(ReplicationType replicationType, String slotName, boolean temporary) {
31+
ReplicationSlotRequest(ReplicationType replicationType, String slotName) {
3432
this.replicationType = Assert.requireNonNull(replicationType, "replicationType must not be null");
3533
this.slotName = Assert.requireNotEmpty(slotName, "slotName must not be null");
36-
this.temporary = temporary;
3734
}
3835

3936
/**
@@ -79,33 +76,23 @@ String getSlotName() {
7976
return this.slotName;
8077
}
8178

82-
/**
83-
* Returns the slot is temporary.
84-
*
85-
* @return {@code true} if the slot should be temporary
86-
*/
87-
boolean isTemporary() {
88-
return this.temporary;
89-
}
90-
9179
/**
9280
* Slot creation request for logical replication.
9381
*/
9482
static class LogicalReplicationSlotRequest extends ReplicationSlotRequest {
9583

9684
private final String outputPlugin;
9785

98-
public LogicalReplicationSlotRequest(String slotName, boolean temporaryOption, String outputPlugin) {
99-
super(ReplicationType.LOGICAL, slotName, temporaryOption);
86+
public LogicalReplicationSlotRequest(String slotName, String outputPlugin) {
87+
super(ReplicationType.LOGICAL, slotName);
10088
this.outputPlugin = outputPlugin;
10189
}
10290

10391
@Override
10492
public String asSQL() {
10593
return String.format(
106-
"CREATE_REPLICATION_SLOT %s %s LOGICAL %s",
94+
"CREATE_REPLICATION_SLOT %s LOGICAL %s",
10795
getSlotName(),
108-
isTemporary() ? "TEMPORARY" : "",
10996
this.outputPlugin
11097
);
11198
}
@@ -115,7 +102,6 @@ public String toString() {
115102
return "LogicalReplicationSlotRequest{" +
116103
"slotName='" + getSlotName() + '\'' +
117104
", outputPlugin='" + this.outputPlugin + '\'' +
118-
", temporaryOption=" + isTemporary() +
119105
'}';
120106
}
121107

@@ -127,23 +113,21 @@ public String toString() {
127113
static class PhysicalReplicationSlotRequest extends ReplicationSlotRequest {
128114

129115
public PhysicalReplicationSlotRequest(String slotName, boolean temporaryOption) {
130-
super(ReplicationType.PHYSICAL, slotName, temporaryOption);
116+
super(ReplicationType.PHYSICAL, slotName);
131117
}
132118

133119
@Override
134120
public String asSQL() {
135121
return String.format(
136-
"CREATE_REPLICATION_SLOT %s %s PHYSICAL",
137-
getSlotName(),
138-
isTemporary() ? "TEMPORARY" : ""
122+
"CREATE_REPLICATION_SLOT %s PHYSICAL",
123+
getSlotName()
139124
);
140125
}
141126

142127
@Override
143128
public String toString() {
144129
return "PhysicalReplicationSlotRequest{" +
145130
"slotName='" + getSlotName() + '\'' +
146-
", temporaryOption=" + isTemporary() +
147131
'}';
148132
}
149133

@@ -155,8 +139,6 @@ final static class DefaultLogicalSlotRequestBuilder implements LogicalSlotReques
155139

156140
private String outputPlugin;
157141

158-
private boolean temporary;
159-
160142
@Override
161143
public LogicalSlotRequestBuilder slotName(String slotName) {
162144
this.slotName = Assert.requireNotEmpty(slotName, "slotName must not be null and not empty");
@@ -171,13 +153,12 @@ public LogicalSlotRequestBuilder outputPlugin(String outputPlugin) {
171153

172154
@Override
173155
public LogicalSlotRequestBuilder temporary() {
174-
this.temporary = true;
175156
return this;
176157
}
177158

178159
@Override
179160
public ReplicationSlotRequest build() {
180-
return new LogicalReplicationSlotRequest(this.slotName, this.temporary, this.outputPlugin);
161+
return new LogicalReplicationSlotRequest(this.slotName, this.outputPlugin);
181162
}
182163

183164
}

src/test/java/io/r2dbc/gaussdb/replication/ReplicationSlotRequestUnitTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,15 @@ void buildsLogicalRequest() {
4545
ReplicationSlotRequest request = ReplicationSlotRequest.logical().slotName("slot").outputPlugin("output").temporary().build();
4646
assertThat(request.getReplicationType()).isEqualTo(ReplicationType.LOGICAL);
4747
assertThat(request.getSlotName()).isEqualTo("slot");
48-
assertThat(request.isTemporary()).isTrue();
49-
assertThat(request.asSQL()).isEqualTo("CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL output");
48+
assertThat(request.asSQL()).isEqualTo("CREATE_REPLICATION_SLOT slot LOGICAL output");
5049
}
5150

5251
@Test
5352
void buildsPhysicalRequest() {
5453
ReplicationSlotRequest request = ReplicationSlotRequest.physical().slotName("slot").temporary().build();
5554
assertThat(request.getReplicationType()).isEqualTo(ReplicationType.PHYSICAL);
5655
assertThat(request.getSlotName()).isEqualTo("slot");
57-
assertThat(request.isTemporary()).isTrue();
58-
assertThat(request.asSQL()).isEqualTo("CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL");
56+
assertThat(request.asSQL()).isEqualTo("CREATE_REPLICATION_SLOT slot PHYSICAL");
5957
}
6058

6159
}

0 commit comments

Comments
 (0)