Skip to content

Commit d14c5f8

Browse files
committed
Refactor CloudEvents package structure
Remove v1 subpackage and flatten the CloudEvents package hierarchy. Introduce strategy pattern for format conversion to replace enum-based approach, improving extensibility and reduce dependencies. Key changes: - Move all classes from cloudevents.v1 to cloudevents base package - Remove optional format dependencies (JSON, XML, Avro) from build - Replace `ConversionType` enum with `FormatStrategy` interface - Add `CloudEventMessageFormatStrategy` as default implementation - Inline `HeaderPatternMatcher` logic into `ToCloudEventTransformerExtensions` - Add `@NullMarked` package annotations and `@Nullable` throughout - Document `targetClass` parameter behavior in `CloudEventMessageConverter` - Split transformer tests for better organization and coverage - Update component type identifier to "ce:to-cloudevents-transformer" - Remove unnecessary docs from package-info
1 parent ffb5fbe commit d14c5f8

26 files changed

+639
-1085
lines changed

build.gradle

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -474,20 +474,10 @@ project('spring-integration-cassandra') {
474474
}
475475

476476
project('spring-integration-cloudevents') {
477-
description = 'Spring Integration Cloud Events Support'
477+
description = 'Spring Integration CloudEvents Support'
478478

479479
dependencies {
480480
api "io.cloudevents:cloudevents-core:$cloudEventsVersion"
481-
optionalApi "io.cloudevents:cloudevents-json-jackson:$cloudEventsVersion"
482-
483-
optionalApi("io.cloudevents:cloudevents-avro-compact:$cloudEventsVersion") {
484-
exclude group: 'org.apache.avro', module: 'avro'
485-
}
486-
optionalApi "org.apache.avro:avro:$avroVersion"
487-
optionalApi "io.cloudevents:cloudevents-xml:$cloudEventsVersion"
488-
optionalApi 'org.jspecify:jspecify'
489-
490-
testImplementation 'org.springframework.amqp:spring-rabbit-test'
491481
}
492482
}
493483

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.cloudevents.v1;
17+
package org.springframework.integration.cloudevents;
1818

1919
import java.nio.charset.StandardCharsets;
2020

@@ -25,6 +25,7 @@
2525
import io.cloudevents.core.message.MessageReader;
2626
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
2727
import io.cloudevents.core.message.impl.MessageUtils;
28+
import org.jspecify.annotations.Nullable;
2829

2930
import org.springframework.messaging.Message;
3031
import org.springframework.messaging.MessageHeaders;
@@ -42,7 +43,7 @@
4243
*/
4344
public class CloudEventMessageConverter implements MessageConverter {
4445

45-
private String cePrefix;
46+
private final String cePrefix;
4647

4748
public CloudEventMessageConverter(String cePrefix) {
4849
this.cePrefix = cePrefix;
@@ -52,28 +53,35 @@ public CloudEventMessageConverter() {
5253
this(CloudEventsHeaders.CE_PREFIX);
5354
}
5455

56+
/**
57+
Convert the payload of a Message from a CloudEvent to a typed Object of the specified target class.
58+
If the converter does not support the specified media type or cannot perform the conversion, it should return null.
59+
* @param message the input message
60+
* @param targetClass This method does not check the class since it is expected to be a {@link CloudEvent}
61+
* @return the result of the conversion, or null if the converter cannot perform the conversion
62+
*/
5563
@Override
5664
public Object fromMessage(Message<?> message, Class<?> targetClass) {
57-
Assert.state(CloudEvent.class.isAssignableFrom(targetClass), "Target class must be a CloudEvent");
5865
return createMessageReader(message).toEvent();
5966
}
6067

6168
@Override
62-
public Message<?> toMessage(Object payload, MessageHeaders headers) {
69+
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
6370
Assert.state(payload instanceof CloudEvent, "Payload must be a CloudEvent");
71+
Assert.state(headers != null, "Headers must not be null");
6472
return CloudEventUtils.toReader((CloudEvent) payload).read(new MessageBuilderMessageWriter(headers, this.cePrefix));
6573
}
6674

6775
private MessageReader createMessageReader(Message<?> message) {
68-
return MessageUtils.parseStructuredOrBinaryMessage(//
69-
() -> contentType(message.getHeaders()), //
70-
format -> structuredMessageReader(message, format), //
71-
() -> version(message.getHeaders()), //
72-
version -> binaryMessageReader(message, version) //
76+
return MessageUtils.parseStructuredOrBinaryMessage(
77+
() -> contentType(message.getHeaders()),
78+
format -> structuredMessageReader(message, format),
79+
() -> version(message.getHeaders()),
80+
version -> binaryMessageReader(message, version)
7381
);
7482
}
7583

76-
private String version(MessageHeaders message) {
84+
private @Nullable String version(MessageHeaders message) {
7785
if (message.containsKey(CloudEventsHeaders.SPEC_VERSION)) {
7886
return message.get(CloudEventsHeaders.SPEC_VERSION).toString();
7987
}
@@ -88,7 +96,7 @@ private MessageReader structuredMessageReader(Message<?> message, EventFormat fo
8896
return new GenericStructuredMessageReader(format, getBinaryData(message));
8997
}
9098

91-
private String contentType(MessageHeaders message) {
99+
private @Nullable String contentType(MessageHeaders message) {
92100
if (message.containsKey(MessageHeaders.CONTENT_TYPE)) {
93101
return message.get(MessageHeaders.CONTENT_TYPE).toString();
94102
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.cloudevents.v1;
17+
package org.springframework.integration.cloudevents;
1818

1919
/**
2020
* Constants for Cloud Events header names.
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.cloudevents.v1;
17+
package org.springframework.integration.cloudevents;
1818

1919
import java.util.Map;
2020
import java.util.function.BiConsumer;
@@ -23,6 +23,7 @@
2323
import io.cloudevents.core.data.BytesCloudEventData;
2424
import io.cloudevents.core.impl.StringUtils;
2525
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
26+
import org.jspecify.annotations.Nullable;
2627

2728
/**
2829
* Utility for converting maps (message headers) to `CloudEvent` contexts.
@@ -34,11 +35,12 @@
3435
*
3536
*/
3637
public class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {
38+
3739
private final String cePrefix;
3840

3941
private final Map<String, Object> headers;
4042

41-
public MessageBinaryMessageReader(SpecVersion version, Map<String, Object> headers, byte[] payload, String cePrefix) {
43+
public MessageBinaryMessageReader(SpecVersion version, Map<String, Object> headers, byte @Nullable [] payload, String cePrefix) {
4244
super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
4345
this.headers = headers;
4446
this.cePrefix = cePrefix;
@@ -55,7 +57,7 @@ protected boolean isContentTypeHeader(String key) {
5557

5658
@Override
5759
protected boolean isCloudEventsHeader(String key) {
58-
return key != null && key.length() > this.cePrefix.length() && StringUtils.startsWithIgnoreCase(key, this.cePrefix);
60+
return key.length() > this.cePrefix.length() && StringUtils.startsWithIgnoreCase(key, this.cePrefix);
5961
}
6062

6163
@Override
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.cloudevents.v1;
17+
package org.springframework.integration.cloudevents;
1818

1919
import java.util.HashMap;
2020
import java.util.Map;
@@ -26,6 +26,7 @@
2626
import io.cloudevents.rw.CloudEventContextWriter;
2727
import io.cloudevents.rw.CloudEventRWException;
2828
import io.cloudevents.rw.CloudEventWriter;
29+
import org.jspecify.annotations.Nullable;
2930

3031
import org.springframework.messaging.Message;
3132
import org.springframework.messaging.support.MessageBuilder;
@@ -62,7 +63,7 @@ public Message<byte[]> setEvent(EventFormat format, byte[] value) throws CloudEv
6263
}
6364

6465
@Override
65-
public Message<byte[]> end(CloudEventData value) throws CloudEventRWException {
66+
public Message<byte[]> end(@Nullable CloudEventData value) throws CloudEventRWException {
6667
return MessageBuilder.withPayload(value == null ? new byte[0] : value.toBytes()).copyHeaders(this.headers).build();
6768
}
6869

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
@org.jspecify.annotations.NullMarked
3+
package org.springframework.integration.cloudevents;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.cloudevents.v1.transformer;
17+
package org.springframework.integration.cloudevents.transformer;
1818

1919
import java.net.URI;
2020
import java.time.OffsetDateTime;
2121

2222
import org.jspecify.annotations.Nullable;
2323

24-
import org.springframework.integration.cloudevents.v1.CloudEventsHeaders;
24+
import org.springframework.integration.cloudevents.CloudEventsHeaders;
2525

2626
/**
2727
* Configuration properties for CloudEvent metadata and formatting.
Lines changed: 13 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,20 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.cloudevents.v1.transformer;
18-
19-
import java.util.HashMap;
20-
import java.util.Map;
21-
import java.util.Objects;
17+
package org.springframework.integration.cloudevents.transformer;
2218

2319
import io.cloudevents.CloudEvent;
24-
import io.cloudevents.avro.compact.AvroCompactFormat;
2520
import io.cloudevents.core.builder.CloudEventBuilder;
26-
import io.cloudevents.jackson.JsonFormat;
27-
import io.cloudevents.xml.XMLFormat;
2821
import org.jspecify.annotations.Nullable;
2922

30-
import org.springframework.integration.cloudevents.v1.CloudEventMessageConverter;
31-
import org.springframework.integration.cloudevents.v1.transformer.utils.HeaderPatternMatcher;
23+
import org.springframework.integration.cloudevents.CloudEventMessageConverter;
24+
import org.springframework.integration.cloudevents.CloudEventsHeaders;
25+
import org.springframework.integration.cloudevents.transformer.strategies.CloudEventMessageFormatStrategy;
26+
import org.springframework.integration.cloudevents.transformer.strategies.FormatStrategy;
3227
import org.springframework.integration.transformer.AbstractTransformer;
3328
import org.springframework.messaging.Message;
3429
import org.springframework.messaging.MessageHeaders;
35-
import org.springframework.messaging.converter.MessageConversionException;
3630
import org.springframework.messaging.converter.MessageConverter;
37-
import org.springframework.messaging.support.MessageBuilder;
38-
import org.springframework.util.Assert;
3931

4032
/**
4133
* A Spring Integration transformer that converts messages to CloudEvent format.
@@ -62,24 +54,11 @@
6254
*/
6355
public class ToCloudEventTransformer extends AbstractTransformer {
6456

65-
/**
66-
* Enumeration of supported CloudEvent conversion types.
67-
* <p>
68-
* Defines the different output formats supported by the transformer:
69-
* <ul>
70-
* <li>DEFAULT - No format conversion, uses standard CloudEvent message structure</li>
71-
* <li>XML - Serializes CloudEvent as XML in the message payload</li>
72-
* <li>JSON - Serializes CloudEvent as JSON in the message payload</li>
73-
* <li>AVRO - Serializes CloudEvent as compact Avro binary in the message payload</li>
74-
* </ul>
75-
*/
76-
public enum ConversionType { DEFAULT, XML, JSON, AVRO }
77-
7857
private final MessageConverter messageConverter;
7958

8059
private final @Nullable String cloudEventExtensionPatterns;
8160

82-
private final ConversionType conversionType;
61+
private final FormatStrategy formatStrategy;
8362

8463
private final CloudEventProperties cloudEventProperties;
8564

@@ -90,19 +69,20 @@ public enum ConversionType { DEFAULT, XML, JSON, AVRO }
9069
* supports wildcards and negation with '!' prefix If a header matches one of the '!' it is excluded from
9170
* cloud event headers and the message headers. If a header does not match for a prefix or a exclusion, the header
9271
* is left in the message headers. . Null to disable extension mapping.
93-
* @param conversionType the output format for the CloudEvent (DEFAULT, XML, JSON, or AVRO)
72+
* @param formatStrategy The strategy that determines how the CloudEvent will be rendered
9473
* @param cloudEventProperties configuration properties for CloudEvent metadata (id, source, type, etc.)
9574
*/
9675
public ToCloudEventTransformer(@Nullable String cloudEventExtensionPatterns,
97-
ConversionType conversionType, CloudEventProperties cloudEventProperties) {
76+
FormatStrategy formatStrategy, CloudEventProperties cloudEventProperties) {
9877
this.messageConverter = new CloudEventMessageConverter(cloudEventProperties.getCePrefix());
9978
this.cloudEventExtensionPatterns = cloudEventExtensionPatterns;
100-
this.conversionType = conversionType;
79+
this.formatStrategy = formatStrategy;
10180
this.cloudEventProperties = cloudEventProperties;
10281
}
10382

10483
public ToCloudEventTransformer() {
105-
this(null, ConversionType.DEFAULT, new CloudEventProperties());
84+
this(null, new CloudEventMessageFormatStrategy(CloudEventsHeaders.CE_PREFIX),
85+
new CloudEventProperties());
10686
}
10787

10888
/**
@@ -135,75 +115,7 @@ protected Object doTransform(Message<?> message) {
135115
.withData(getPayloadAsBytes(message.getPayload()))
136116
.withExtension(extensions)
137117
.build();
138-
139-
switch (this.conversionType) {
140-
case XML:
141-
return convertToXmlMessage(cloudEvent, message.getHeaders());
142-
case JSON:
143-
return convertToJsonMessage(cloudEvent, message.getHeaders());
144-
case AVRO:
145-
return convertToAvroMessage(cloudEvent, message.getHeaders());
146-
default:
147-
var result = this.messageConverter.toMessage(cloudEvent, filterHeaders(message.getHeaders()));
148-
Assert.state(result != null, "Payload result must not be null");
149-
return result;
150-
}
151-
}
152-
153-
private Message<String> convertToXmlMessage(CloudEvent cloudEvent, MessageHeaders originalHeaders) {
154-
XMLFormat xmlFormat = new XMLFormat();
155-
String xmlContent = new String(xmlFormat.serialize(cloudEvent));
156-
return buildStringMessage(xmlContent, originalHeaders, "application/xml");
157-
}
158-
159-
private Message<String> convertToJsonMessage(CloudEvent cloudEvent, MessageHeaders originalHeaders) {
160-
JsonFormat jsonFormat = new JsonFormat();
161-
String jsonContent = new String(jsonFormat.serialize(cloudEvent));
162-
return buildStringMessage(jsonContent, originalHeaders, "application/json");
163-
}
164-
165-
private Message<String> buildStringMessage(String serializedCloudEvent,
166-
MessageHeaders originalHeaders, String contentType) {
167-
try {
168-
return MessageBuilder.withPayload(serializedCloudEvent)
169-
.copyHeaders(filterHeaders(originalHeaders))
170-
.setHeader("content-type", contentType)
171-
.build();
172-
}
173-
catch (Exception e) {
174-
throw new MessageConversionException("Failed to convert CloudEvent to " + contentType, e);
175-
}
176-
}
177-
178-
private Message<byte[]> convertToAvroMessage(CloudEvent cloudEvent, MessageHeaders originalHeaders) {
179-
try {
180-
AvroCompactFormat avroFormat = new AvroCompactFormat();
181-
byte[] avroBytes = avroFormat.serialize(cloudEvent);
182-
return MessageBuilder.withPayload(avroBytes)
183-
.copyHeaders(filterHeaders(originalHeaders))
184-
.setHeader("content-type", "application/avro")
185-
.build();
186-
}
187-
catch (Exception e) {
188-
throw new RuntimeException("Failed to convert CloudEvent to application/avro", e);
189-
}
190-
}
191-
192-
/**
193-
* This method creates a {@link MessageHeaders} that were not placed in the CloudEvent and were not excluded via the
194-
* categorization mechanism.
195-
* @param headers The {@link MessageHeaders} to be filtered.
196-
* @return {@link MessageHeaders} that have been filtered.
197-
*/
198-
private MessageHeaders filterHeaders(MessageHeaders headers) {
199-
200-
Map<String, Object> filteredHeaders = new HashMap<>();
201-
headers.keySet().forEach(key -> {
202-
if (HeaderPatternMatcher.categorizeHeader(key, this.cloudEventExtensionPatterns) == null) {
203-
filteredHeaders.put(key, Objects.requireNonNull(headers.get(key)));
204-
}
205-
});
206-
return new MessageHeaders(filteredHeaders);
118+
return this.formatStrategy.convert(cloudEvent, new MessageHeaders(extensions.getFilteredHeaders()));
207119
}
208120

209121
private byte[] getPayloadAsBytes(Object payload) {
@@ -220,7 +132,7 @@ else if (payload instanceof String stringPayload) {
220132

221133
@Override
222134
public String getComponentType() {
223-
return "to-cloud-transformer";
135+
return "ce:to-cloudevents-transformer";
224136
}
225137

226138
}

0 commit comments

Comments
 (0)