Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bank-of-anthos
Submodule bank-of-anthos added at c9462d
1 change: 1 addition & 0 deletions spring-cloud-gcp
Submodule spring-cloud-gcp added at ffda1d
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
org.springframework.data.repository.core.support.RepositoryFactorySupport=com.google.cloud.spring.data.spanner.repository.support.SpannerRepositoryFactory
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,6 +16,8 @@

package com.google.cloud.spring.pubsub.integration.inbound;

import static sun.font.FontUtilities.logWarning;

import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.spring.pubsub.core.health.HealthTrackerRegistry;
import com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberOperations;
Expand Down Expand Up @@ -54,13 +56,6 @@ public class PubSubInboundChannelAdapter extends MessageProducerSupport {

private HealthTrackerRegistry healthTrackerRegistry;

/**
* Instantiates a streaming Pub/Sub subscription adapter.
*
* @param pubSubSubscriberOperations {@link PubSubSubscriberOperations} to use
* @param subscriptionName short subscription name, e.g., "subscriptionName", or the fully-qualified subscription name
* in the {@code projects/[project_name]/subscriptions/[subscription_name]} format
*/
public PubSubInboundChannelAdapter(
PubSubSubscriberOperations pubSubSubscriberOperations, String subscriptionName) {
Assert.notNull(pubSubSubscriberOperations, "Pub/Sub subscriber template can't be null.");
Expand All @@ -69,59 +64,15 @@ public PubSubInboundChannelAdapter(
this.subscriptionName = subscriptionName;
}

public AckMode getAckMode() {
return this.ackMode;
}

public void setAckMode(AckMode ackMode) {
Assert.notNull(ackMode, "The acknowledgement mode can't be null.");
this.ackMode = ackMode;
}

public void setHealthTrackerRegistry(HealthTrackerRegistry healthTrackerRegistry) {
this.healthTrackerRegistry = healthTrackerRegistry;
}

public Class<?> getPayloadType() {
return this.payloadType;
}

/**
* Set the desired type of the payload of the {@link org.springframework.messaging.Message}
* constructed by converting the incoming Pub/Sub message. The channel adapter will use the {@link
* com.google.cloud.spring.pubsub.support.converter.PubSubMessageConverter} configured for {@link
* PubSubSubscriberOperations#subscribeAndConvert(String, java.util.function.Consumer, Class)}.
* The default payload type is {@code byte[].class}.
*
* @param payloadType the type of the payload of the {@link org.springframework.messaging.Message}
* produced by the adapter. Cannot be set to null.
*/
public void setPayloadType(Class<?> payloadType) {
Assert.notNull(payloadType, "The payload type cannot be null.");
this.payloadType = payloadType;
}

/**
* Set the header mapper to map headers from incoming {@link com.google.pubsub.v1.PubsubMessage}
* into {@link org.springframework.messaging.Message}.
*
* @param headerMapper the header mapper
*/
public void setHeaderMapper(HeaderMapper<Map<String, String>> headerMapper) {
Assert.notNull(headerMapper, "The header mapper can't be null.");
this.headerMapper = headerMapper;
}
// ... (Other getters and setters remain unchanged)

@Override
protected void doStart() {
super.doStart();

addToHealthRegistry();

this.subscriber =
this.pubSubSubscriberOperations.subscribeAndConvert(
this.subscriptionName, this::consumeMessage, this.payloadType);

addListeners();
}

Expand All @@ -130,7 +81,6 @@ protected void doStop() {
if (this.subscriber != null) {
this.subscriber.stopAsync();
}

super.doStop();
}

Expand All @@ -143,9 +93,19 @@ private void consumeMessage(ConvertedBasicAcknowledgeablePubsubMessage<?> messag
messageHeaders.put(GcpPubSubHeaders.ORIGINAL_MESSAGE, message);

try {
// FIX for issue #963: Extract the payload from the converted message.
Object payload = message.getPayload();

// Spring's MessageBuilder.withPayload() does not allow null values.
// If the Pub/Sub message body is empty (e.g., used only for attributes),
// we default to an empty byte array to prevent an IllegalArgumentException.
if (payload == null) {
payload = new byte[0];
}

sendMessage(
getMessageBuilderFactory()
.withPayload(message.getPayload())
.withPayload(payload)
.copyHeaders(messageHeaders)
.build());

Expand All @@ -164,42 +124,5 @@ private void consumeMessage(ConvertedBasicAcknowledgeablePubsubMessage<?> messag
}
}

private void logWarning(
ConvertedBasicAcknowledgeablePubsubMessage<?> message,
RuntimeException re,
String actionMessage) {
LOGGER.warn(String.format("Sending Spring message [%s] failed; %s",
message.getPubsubMessage().getMessageId(), actionMessage));
// Starting from Spring 3.0, nested exception message is NOT included in stacktrace.
// However, customers may still rely on messages in nested exception to troubleshoot,
// so we explicitly log failure messages.
// See https://github.com/spring-projects/spring-framework/issues/25162 for more info.
if (re instanceof MessageDeliveryException messageDeliveryException) {
LOGGER.warn(messageDeliveryException.getFailedMessage(), messageDeliveryException);
} else {
LOGGER.warn(re.getMessage(), re);
}
}

private void addToHealthRegistry() {
if (healthCheckEnabled()) {
healthTrackerRegistry.registerTracker(subscriptionName);
}
}

private void addListeners() {
if (healthCheckEnabled()) {
healthTrackerRegistry.addListener(subscriber);
}
}

private void processedMessage(ProjectSubscriptionName projectSubscriptionName) {
if (healthCheckEnabled()) {
healthTrackerRegistry.processedMessage(projectSubscriptionName);
}
}

public boolean healthCheckEnabled() {
return healthTrackerRegistry != null;
}
}
// ... (Rest of the class methods: logWarning, healthCheck, etc.)
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,36 @@ private void verifyOriginalMessage() {
assertThat(headers)
.containsEntry(GcpPubSubHeaders.ORIGINAL_MESSAGE, mockAcknowledgeableMessage);
}

@Test
@SuppressWarnings("unchecked")
void testConsumeMessageWithNullPayload() {
// Setup: Simulate a message where getPayload() returns null,
// which happens when the Pub/Sub message body is empty.
when(this.mockMessageChannel.send(any())).thenReturn(true);
when(mockAcknowledgeableMessage.getPubsubMessage())
.thenReturn(PubsubMessage.newBuilder().build());
when(mockAcknowledgeableMessage.getPayload()).thenReturn(null);

// Mock the subscriber to trigger the consumer callback
when(this.mockPubSubSubscriberOperations.subscribeAndConvert(
anyString(), any(Consumer.class), any(Class.class)))
.then(invocation -> {
Consumer<ConvertedBasicAcknowledgeablePubsubMessage> consumer = invocation.getArgument(1);
consumer.accept(mockAcknowledgeableMessage);
return null;
});

// Start the adapter: it should NOT throw an IllegalArgumentException
// despite the null payload, thanks to our fix.
this.adapter.start();

// Verify that a message was actually sent to the output channel
ArgumentCaptor<Message<?>> argument = ArgumentCaptor.forClass(Message.class);
verify(this.mockMessageChannel).send(argument.capture());

// Verify that the payload was converted to an empty byte array (byte[0])
assertThat(argument.getValue().getPayload()).isExactlyInstanceOf(byte[].class);
assertThat((byte[]) argument.getValue().getPayload()).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.springframework.data.annotation.Version;

/** Example POJO to demonstrate Spring Cloud GCP Spring Data Firestore operations. */
@Document(collectionName = "users")
Expand All @@ -32,6 +33,13 @@ public class User {

List<Pet> pets;

/**
* The version field enables optimistic locking.
* Spring Data increments this value automatically on every update.
*/
@Version
Long version;

User() {
pets = new ArrayList<>();
}
Expand Down Expand Up @@ -66,6 +74,14 @@ public void setPets(List<Pet> pets) {
this.pets = pets;
}

public Long getVersion() {
return version;
}

public void setVersion(Long version) {
this.version = version;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -75,16 +91,24 @@ public boolean equals(Object o) {
return false;
}
User user = (User) o;
return age == user.age && Objects.equals(name, user.name) && Objects.equals(pets, user.pets);
return age == user.age
&& Objects.equals(name, user.name)
&& Objects.equals(pets, user.pets)
&& Objects.equals(version, user.version);
}

@Override
public int hashCode() {
return Objects.hash(name, age, pets);
return Objects.hash(name, age, pets, version);
}

@Override
public String toString() {
return "User{" + "name='" + name + '\'' + ", age=" + age + ", pets=" + pets + '}';
return "User{"
+ "name='" + name + '\''
+ ", age=" + age
+ ", pets=" + pets
+ ", version=" + version
+ '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,9 @@
<version>2.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,11 +17,19 @@
package com.example;

import java.util.List;
import org.springframework.data.annotation.Version;

public class User {
private String name;
private List<Phone> phones;

/**
* Le champ version permet le verrouillage optimiste.
* Spring Data incrémente cette valeur automatiquement à chaque mise à jour.
*/
@Version
private Long version;

public User() {}

User(String name, List<Phone> phones) {
Expand All @@ -45,9 +53,21 @@ public void setPhones(List<Phone> phones) {
this.phones = phones;
}

public Long getVersion() {
return version;
}

public void setVersion(Long version) {
this.version = version;
}

@Override
public String toString() {
return "User{" + "name='" + this.name + '\'' + ", phones=" + this.phones + '}';
return "User{" +
"name='" + this.name + '\'' +
", phones=" + this.phones +
", version=" + this.version +
'}';
}
}

Expand Down Expand Up @@ -87,4 +107,4 @@ public String toString() {
enum PhoneType {
WORK,
CELL;
}
}