From 24dc656bac091413d0d69b010e2de4ea3e03a32a Mon Sep 17 00:00:00 2001 From: Walter Schultz Date: Tue, 21 May 2024 12:55:04 -0400 Subject: [PATCH 1/3] Added kafka spring binder --- docs/user/spring/stream_binder.rst | 182 ++++++++++++++++++ .../pom.xml | 66 +++++++ .../KafkaDatastoreBinderConfiguration.java | 75 ++++++++ ...atastoreBinderConfigurationProperties.java | 28 +++ .../KafkaDatastoreBinderProvisioner.java | 52 +++++ .../KafkaDatastoreMessageBinder.java | 111 +++++++++++ .../KafkaDatastoreMessageProducer.java | 70 +++++++ .../converters/SimpleFeatureConverter.java | 54 ++++++ .../main/resources/META-INF/spring.binders | 2 + .../KafkaDatastoreMessageBinderTest.java | 168 ++++++++++++++++ .../src/test/resources/observation.conf | 7 + .../src/test/resources/reference.conf | 1 + geomesa-spring/pom.xml | 45 +++++ pom.xml | 1 + 14 files changed, 862 insertions(+) create mode 100644 docs/user/spring/stream_binder.rst create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf create mode 100644 geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf create mode 100644 geomesa-spring/pom.xml diff --git a/docs/user/spring/stream_binder.rst b/docs/user/spring/stream_binder.rst new file mode 100644 index 000000000000..be5cf896d9da --- /dev/null +++ b/docs/user/spring/stream_binder.rst @@ -0,0 +1,182 @@ +Spring Cloud Stream Geomesa Kafka Datastore Binder +================================================== + +The Spring Cloud Stream Geomesa Kafka Datastore Binder provides an easy way for Spring Cloud Stream apps to hook into +Geomesa Kafka Datastore to process events. + +If you are unfamiliar with Spring Cloud Stream, see the official documentation for an introduction: +https://spring.io/projects/spring-cloud-stream + +Input/Output Types +------------ + +This binder will provide all `KafkaFeatureEvent`s from kafka datastore to your configured function definitions. Each +function will have to do it's own type comparision to see if the event is a `KafkaFeatureEvent.KafkaFeatureChanged`, +`KafkaFeatureEvent.KafkaFeatureRemoved`, or another event type. + +The module also ships with a SimpleFeature converter, which allows you to configure function definitions that consume +or produces `SimpleFeature`s and avoid working with `KafkaFeatureEvent`s directly. + +Note: The SimpleFeature converter extracts the SimpleFeature out of `KafkaFeatureEvent.KafkaFeatureChanged` events and +ignores all others. Any function definition that consumes SimpleFeatures will miss the +`KafkaFeatureEvent.KafkaFeatureRemoved` and ``KafkaFeatureEvent.KafkaFeatureCleared` messages. And any function +definition that only writes SimpleFeatures will not be able to send those messages. + +Configuration +------------- + +The configuration options are under spring.cloud.stream.kafka-datastore.binder. This binder will accept any +configuration options for the standard java geomesa kafka-datastore, with the periods ('.') replaced with dashes ('-'). +For example, to specify kafka.catalog.topic for the binder, set: +```yaml +spring: + cloud: + stream: + kafka-datastore: + binder: + kafka-catalog-topic: geomesa-catalog-topic +``` + +For a full list of configuration options, see: https://www.geomesa.org/documentation/stable/user/kafka/usage.html + +Examples +-------- + +Simple Logger App +----------------- +``` +@Bean +public Consumer log() { + return obj -> logger.info(obj.toString()); +} +``` +``` +spring: + cloud: + function: + definition: log + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + log-in-0: input + bindings: + input: + destination: messages + group: logger +``` + +Simple Enricher App +------------------- + +``` +@Bean +public Function attachSourceField() { + return sf -> { + sf.setAttribute("source", "un-labelled source"); + return sf; + }; +} +``` +``` +spring: + cloud: + function: + definition: attachSourceField + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + attachSourceField-in-0: input + attachSourceField-out-0: output + bindings: + input: + destination: un-labelled-source-ob + group: sft-reader + output: + destination: observations + group: sft-writer +``` + +Simple Filter App +------------------- +``` +@Bean +public Function excludeMoving() { + return sf -> { + if (sf.getAttribute("status").equals("IN_TRANSIT")) { + return null; + } + return sf; + }; +} +``` +``` +spring: + cloud: + function: + definition: filterMoving + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + filterMoving-in-0: input + filterMoving-out-0: output + bindings: + input: + destination: movingAndUnmovingThings + group: sft-reader + output: + destination: unMovingThings + group: sft-writer +``` + +Multiple Datastore App +---------------------- + +In the case of multi-bindings, you simply need to submit override the proper kafka-datastore fields in the environment +field. + +``` +@Bean +public Function passThrough() { + return event -> event; +} +``` +``` +spring: + cloud: + function: + definition: passThrough + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + passThrough-in-0: input + passThrough-out-0: output + binders: + kds-start: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/start + kds-end: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/end + bindings: + input: + destination: observations + group: sft-reader + binder: kds-start + output: + destination: observations + group: sft-writer + binder: kds-end + +``` \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml new file mode 100644 index 000000000000..7a1ed6bf131e --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + org.locationtech.geomesa + geomesa-spring + 5.1.0-SNAPSHOT + + + geomesa-spring-cloud-stream-binder-kafka-datastore + + + 11 + 11 + UTF-8 + + + + + org.locationtech.geomesa + geomesa-kafka-datastore_2.12 + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.boot + spring-boot-starter-log4j2 + + + + org.apache.curator + curator-client + + + log4j + log4j + + + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.apache.logging.log4j + log4j-to-slf4j + + + + + + \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java new file mode 100644 index 000000000000..dfd5cf4db7bb --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java @@ -0,0 +1,75 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.spring.binder.kafka.datastore; + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.DataStoreFinder; +import org.locationtech.geomesa.spring.binder.kafka.datastore.converters.SimpleFeatureConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +@Configuration +@Import({ PropertyPlaceholderAutoConfiguration.class }) +@EnableConfigurationProperties({KafkaDatastoreBinderConfigurationProperties.class}) +public class KafkaDatastoreBinderConfiguration { + private static final Logger logger = LoggerFactory.getLogger(KafkaDatastoreBinderConfiguration.class); + + @Autowired + KafkaDatastoreBinderConfigurationProperties kafkaDatastoreBinderConfigurationProperties; + + @Bean + @ConditionalOnMissingBean + public Supplier dsFactory() { + return () -> { + Map inParameters = new HashMap<>(); + kafkaDatastoreBinderConfigurationProperties.getBinder() + .forEach((key, value) -> inParameters.put(key.replace('-', '.'), value)); + logger.info("Binder config: {}", kafkaDatastoreBinderConfigurationProperties.getBinder()); + logger.info("Connecting to the KDS with params: {}", inParameters); + + try { + return DataStoreFinder.getDataStore(inParameters); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + + @Bean + @ConditionalOnMissingBean + public KafkaDatastoreBinderProvisioner kafkaDatastoreBinderProvisioner() { + return new KafkaDatastoreBinderProvisioner(); + } + + @Bean + @ConditionalOnMissingBean + public KafkaDatastoreMessageBinder kafkaDatastoreMessageBinder(KafkaDatastoreBinderProvisioner kafkaDatastoreBinderProvisioner) { + return new KafkaDatastoreMessageBinder(null, kafkaDatastoreBinderProvisioner, dsFactory()); + } + + @Bean + @ConditionalOnMissingBean + public SimpleFeatureConverter simpleFeatureConverter() { + return new SimpleFeatureConverter(); + } +} diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java new file mode 100644 index 000000000000..e466ef9105ea --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java @@ -0,0 +1,28 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.spring.binder.kafka.datastore; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@ConfigurationProperties(prefix = "spring.cloud.stream.kafka-datastore") +public class KafkaDatastoreBinderConfigurationProperties { + public Map binder = new HashMap<>(); + + public Map getBinder() { + return binder; + } + + public void setBinder(Map additionalProperties) { + this.binder = additionalProperties; + } +} diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java new file mode 100644 index 000000000000..51abaae3a2d1 --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java @@ -0,0 +1,52 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.spring.binder.kafka.datastore; + +import org.springframework.cloud.stream.binder.ConsumerProperties; +import org.springframework.cloud.stream.binder.ProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.cloud.stream.provisioning.ProvisioningProvider; + +public class KafkaDatastoreBinderProvisioner implements ProvisioningProvider { + + @Override + public ProducerDestination provisionProducerDestination( + final String name, + final ProducerProperties producerProperties) { + return new KafkaDatastoreDestination(name); + } + + @Override + public ConsumerDestination provisionConsumerDestination( + final String name, + final String group, + final ConsumerProperties consumerProperties) { + return new KafkaDatastoreDestination(name); + } + + private class KafkaDatastoreDestination implements ProducerDestination, ConsumerDestination { + + private final String destination; + + private KafkaDatastoreDestination(final String destination) { + this.destination = destination; + } + + @Override + public String getName() { + return destination; + } + + @Override + public String getNameForPartition(int partition) { + return destination; + } + } +} diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java new file mode 100644 index 000000000000..bd1cde486384 --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java @@ -0,0 +1,111 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.spring.binder.kafka.datastore; + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.FeatureWriter; +import org.geotools.api.data.SimpleFeatureStore; +import org.geotools.api.data.Transaction; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.filter.Filter; +import org.geotools.api.filter.FilterFactory; +import org.geotools.factory.CommonFactoryFinder; +import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; +import org.locationtech.geomesa.utils.geotools.FeatureUtils; +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.ConsumerProperties; +import org.springframework.cloud.stream.binder.ProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.core.MessageProducer; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.io.IOException; +import java.util.Set; +import java.util.function.Supplier; + +public class KafkaDatastoreMessageBinder extends AbstractMessageChannelBinder { + private static final Logger logger = LoggerFactory.getLogger(KafkaDatastoreMessageBinder.class); + + private final Supplier dsFactory; + private final DataStore ds; + FeatureWriter writer; + FilterFactory ff; + + public KafkaDatastoreMessageBinder( + String[] headersToEmbed, + KafkaDatastoreBinderProvisioner provisioningProvider, + Supplier dsFactory + ) { + super(headersToEmbed, provisioningProvider); + this.dsFactory = dsFactory; + this.ds = dsFactory.get(); + ff = CommonFactoryFinder.getFilterFactory(); + } + + // Maybe handle a Collection and a Collection + @Override + protected MessageHandler createProducerMessageHandler( + final ProducerDestination destination, + final ProducerProperties producerProperties, + final MessageChannel errorChannel) { + return message -> { + String sfName = destination.getName(); + SimpleFeature payload; + + try { + var sft = SimpleFeatureTypeLoader.sftForName(sfName); + if (sft.isDefined()) { + ds.createSchema(sft.get()); + } else { + logger.warn("Could not find a local version of {}, hoping the KDS is already defined...", sfName); + } + + if (message.getPayload() instanceof SimpleFeature) { + payload = (SimpleFeature) message.getPayload(); + } else if (message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureChanged) { + payload = ((KafkaFeatureEvent.KafkaFeatureChanged) message.getPayload()).feature(); + } else if (message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureRemoved) { + var remove = (KafkaFeatureEvent.KafkaFeatureRemoved) message.getPayload(); + SimpleFeatureStore featureStore = (SimpleFeatureStore) ds.getFeatureSource(sfName); + featureStore.removeFeatures(ff.id(Set.of(ff.featureId(remove.id())))); + return; + } else if (message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureCleared) { + SimpleFeatureStore featureStore = (SimpleFeatureStore) ds.getFeatureSource(sfName); + featureStore.removeFeatures(Filter.INCLUDE); + return; + } else { + logger.warn("Could not process message with header {} and payload {}", message.getHeaders(), message.getPayload()); + return; + } + + if (writer == null) { + writer = ds.getFeatureWriterAppend(sfName, Transaction.AUTO_COMMIT); + } + + FeatureUtils.write(writer, payload, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Override + protected MessageProducer createConsumerEndpoint( + final ConsumerDestination destination, + final String group, + final ConsumerProperties properties) { + return new KafkaDatastoreMessageProducer(destination, dsFactory); + } +} diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java new file mode 100644 index 000000000000..caac64aef182 --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java @@ -0,0 +1,70 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.spring.binder.kafka.datastore; + + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.FeatureEvent; +import org.geotools.api.data.SimpleFeatureStore; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import java.io.IOException; +import java.util.function.Supplier; + +public class KafkaDatastoreMessageProducer extends MessageProducerSupport { + private static final Logger logger = LoggerFactory.getLogger(KafkaDatastoreMessageProducer.class); + + private final ConsumerDestination destination; + private final Supplier dsFactory; + private DataStore ds; + + public KafkaDatastoreMessageProducer(ConsumerDestination destination, + Supplier dsFactory) { + this.destination = destination; + this.dsFactory = dsFactory; + } + + @Override + public void doStart() { + SimpleFeatureStore fs = getSimpleFeatureStore(); + + fs.addFeatureListener(featureEvent -> { + Message receivedMessage = MessageBuilder + .withPayload(featureEvent) + .setHeader("contentType", "application/kafka-feature-event") + .build(); + sendMessage(receivedMessage); + }); + } + + private @NotNull SimpleFeatureStore getSimpleFeatureStore() { + SimpleFeatureStore fs = null; + while (fs == null) { + try { + this.ds = dsFactory.get(); + fs = (SimpleFeatureStore) ds.getFeatureSource(destination.getName()); + } catch (IOException e) { + logger.warn("Could not connect to KDS input, waiting for KDS to be created. Error: {}", e.getMessage()); + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + logger.info("Successfully connected to the input KDS!"); + return fs; + } +} diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java new file mode 100644 index 000000000000..b5ccbf3c7869 --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java @@ -0,0 +1,54 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.spring.binder.kafka.datastore.converters; + +import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; +import org.geotools.api.feature.simple.SimpleFeature; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.util.MimeType; + +public class SimpleFeatureConverter extends AbstractMessageConverter { + + public SimpleFeatureConverter() { + super(new MimeType("application", "kafka-feature-event")); + } + + @Override + protected boolean supports(Class aClass) { + return KafkaFeatureEvent.KafkaFeatureChanged.class.equals(aClass); + } + + @Override + protected boolean canConvertFrom(Message message, Class targetClass) { + return message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureChanged; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { + KafkaFeatureEvent.KafkaFeatureChanged event = (KafkaFeatureEvent.KafkaFeatureChanged) message.getPayload(); + return event.feature(); + } + + @Override + protected Object convertToInternal(Object payload, MessageHeaders headers, + Object conversionHint) { + return payload; + } + + @Override + protected MimeType getDefaultContentType(Object toBeConverted) { + if (toBeConverted instanceof SimpleFeature) { + return new MimeType("application", "simple-feature"); + } else { + return super.getDefaultContentType(toBeConverted); + } + } +} diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders new file mode 100644 index 000000000000..141a8515bc72 --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders @@ -0,0 +1,2 @@ +kafka-datastore:\ +com.ccri.geomesa.binder.kafka.datastore.KafkaDatastoreBinderConfiguration \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java new file mode 100644 index 000000000000..9bcc0bfe48dd --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java @@ -0,0 +1,168 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.spring.binder.kafka.datastore; + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.FeatureWriter; +import org.geotools.api.data.SimpleFeatureStore; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.filter.Filter; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.cloud.stream.binder.ProducerProperties; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.MessageBuilder; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class KafkaDatastoreMessageBinderTest { + + @Mock + DataStore ds; + @Mock + FeatureWriter featureWriter; + @Mock + SimpleFeatureStore simpleFeatureStore; + @Mock + SubscribableChannel errorChannel; + + Supplier dsFactory = () -> ds; + + @Test + public void producerMessageHandler_canWriteSft() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); + doReturn(writeableFeature).when(featureWriter).next(); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); + simpleFeature.setAttribute("mmsi", 123456); + Message message = MessageBuilder.withPayload(simpleFeature) + .setHeader("featureType", "application/simple-feature") + .build(); + + handler.handleMessage(message); + + verify(featureWriter).write(); + assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456); + } + + @Test + public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); + doReturn(writeableFeature).when(featureWriter).next(); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); + simpleFeature.setAttribute("mmsi", 123456); + simpleFeature.setAttribute("elevation", 100); + KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(changed) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(featureWriter).write(); + assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456); + assertThat(writeableFeature.getAttribute("elevation")).isEqualTo(100f); + } + + @Test + public void producerMessageHandler_canWriteKafkaFeatureEventRemoved() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(simpleFeatureStore).when(ds).getFeatureSource("test-out"); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + KafkaFeatureEvent removed = new KafkaFeatureEvent.KafkaFeatureRemoved("test-out", "id1", null, Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(removed) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(simpleFeatureStore).removeFeatures(any(Filter.class)); + } + + @Test + public void producerMessageHandler_canWriteKafkaFeatureEventClear() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(simpleFeatureStore).when(ds).getFeatureSource("test-out"); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + KafkaFeatureEvent cleared = new KafkaFeatureEvent.KafkaFeatureCleared("test-out", Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(cleared) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(simpleFeatureStore).removeFeatures(Filter.INCLUDE); + } + +} \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf new file mode 100644 index 000000000000..a277fe846853 --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf @@ -0,0 +1,7 @@ +geomesa.sfts.observation = { + attributes = [ + { name = "location", type = "Point" } + { name = "dtg", type = "Date" } + { name = "id", type = "String" } + ] +} \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf new file mode 100644 index 000000000000..028092e2438c --- /dev/null +++ b/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf @@ -0,0 +1 @@ +include "observation.conf" \ No newline at end of file diff --git a/geomesa-spring/pom.xml b/geomesa-spring/pom.xml new file mode 100644 index 000000000000..3028d73d7d7e --- /dev/null +++ b/geomesa-spring/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + org.locationtech.geomesa + geomesa_2.12 + 5.1.0-SNAPSHOT + + + geomesa-spring + pom + + geomesa-spring-cloud-stream-binder-kafka-datastore + + + + 11 + 11 + UTF-8 + 2021.0.9 + 2.7.18 + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 57b921fb0295..4e92f90f7ddc 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ geomesa-utils-parent geomesa-z3 docs + geomesa-spring From 70bb161611369915bd6a4266c5914aa6822dfe29 Mon Sep 17 00:00:00 2001 From: Walter Schultz Date: Wed, 5 Jun 2024 10:37:31 -0400 Subject: [PATCH 2/3] Moved module under geomesa-kafka --- docs/user/kafka/spring_stream_binder.rst | 191 ++++++++++++++++++ docs/user/spring/stream_binder.rst | 182 ----------------- .../pom.xml | 31 ++- .../KafkaDatastoreBinderConfiguration.java | 0 ...atastoreBinderConfigurationProperties.java | 0 .../KafkaDatastoreBinderProvisioner.java | 0 .../KafkaDatastoreMessageBinder.java | 0 .../KafkaDatastoreMessageProducer.java | 0 .../converters/SimpleFeatureConverter.java | 0 .../main/resources/META-INF/spring.binders | 0 .../KafkaDatastoreMessageBinderTest.java | 0 .../src/test/resources/observation.conf | 0 .../src/test/resources/reference.conf | 0 geomesa-kafka/pom.xml | 1 + geomesa-spring/pom.xml | 45 ----- pom.xml | 1 - 16 files changed, 221 insertions(+), 230 deletions(-) create mode 100644 docs/user/kafka/spring_stream_binder.rst delete mode 100644 docs/user/spring/stream_binder.rst rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/pom.xml (65%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/main/resources/META-INF/spring.binders (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/test/resources/observation.conf (100%) rename {geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore => geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder}/src/test/resources/reference.conf (100%) delete mode 100644 geomesa-spring/pom.xml diff --git a/docs/user/kafka/spring_stream_binder.rst b/docs/user/kafka/spring_stream_binder.rst new file mode 100644 index 000000000000..b92cd8ba6cc9 --- /dev/null +++ b/docs/user/kafka/spring_stream_binder.rst @@ -0,0 +1,191 @@ +Spring Cloud Stream Geomesa Kafka Datastore Binder +================================================== + +The Spring Cloud Stream Geomesa Kafka Datastore Binder provides an easy way for Spring Cloud Stream apps to hook into +Geomesa Kafka Datastore to process events. + +If you are unfamiliar with Spring Cloud Stream, see the official documentation for an introduction: +https://spring.io/projects/spring-cloud-stream + +Input/Output Types +------------ + +This binder will provide all ``KafkaFeatureEvent`` s from kafka datastore to your configured function definitions. Each +function will have to do it's own type comparison to see if the event is a ``KafkaFeatureEvent.KafkaFeatureChanged``, +``KafkaFeatureEvent.KafkaFeatureRemoved``, or another event type. + +The module also ships with a SimpleFeature converter, which allows you to configure function definitions that consume +or produces ``SimpleFeature`` s and avoid working with ``KafkaFeatureEvent`` s directly. + +.. note:: + + The SimpleFeature converter extracts the SimpleFeature out of ``KafkaFeatureEvent.KafkaFeatureChanged`` events and + ignores all others. Any function definition that consumes SimpleFeatures will miss the + ``KafkaFeatureEvent.KafkaFeatureRemoved`` and ``KafkaFeatureEvent.KafkaFeatureCleared`` messages. And any function + definition that only writes SimpleFeatures will not be able to send those messages. + +Configuration +------------- + +The configuration options are under spring.cloud.stream.kafka-datastore.binder. This binder will accept any +configuration options for the standard java geomesa kafka-datastore, with the periods ('.') replaced with dashes ('-'). +For example, to specify kafka.catalog.topic for the binder, set: + +.. code-block:: yaml + + spring: + cloud: + stream: + kafka-datastore: + binder: + kafka-catalog-topic: geomesa-catalog-topic + +For a full list of configuration options, see: https://www.geomesa.org/documentation/stable/user/kafka/usage.html + +Examples +-------- + +Simple Logger App +----------------- + +.. code-block:: java + + @Bean + public Consumer log() { + return obj -> logger.info(obj.toString()); + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: log + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + log-in-0: input + bindings: + input: + destination: messages + group: logger + +Simple Enricher App +------------------- + +.. code-block:: java + + @Bean + public Function attachSourceField() { + return sf -> { + sf.setAttribute("source", "un-labelled source"); + return sf; + }; + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: attachSourceField + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + attachSourceField-in-0: input + attachSourceField-out-0: output + bindings: + input: + destination: un-labelled-source-ob + group: sft-reader + output: + destination: observations + group: sft-writer + +Simple Filter App +------------------- + +.. code-block:: java + + @Bean + public Function excludeMoving() { + return sf -> { + if (sf.getAttribute("status").equals("IN_TRANSIT")) { + return null; + } + return sf; + }; + } + + +.. code-block:: yaml + + spring: + cloud: + function: + definition: filterMoving + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + filterMoving-in-0: input + filterMoving-out-0: output + bindings: + input: + destination: movingAndUnmovingThings + group: sft-reader + output: + destination: unMovingThings + group: sft-writer + +Multiple Datastore App +---------------------- + +In the case of multi-bindings, you simply need to submit override the proper kafka-datastore fields in the environment +field. + +.. code-block:: java + + @Bean + public Function passThrough() { + return event -> event; + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: passThrough + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + passThrough-in-0: input + passThrough-out-0: output + binders: + kds-start: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/start + kds-end: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/end + bindings: + input: + destination: observations + group: sft-reader + binder: kds-start + output: + destination: observations + group: sft-writer + binder: kds-end diff --git a/docs/user/spring/stream_binder.rst b/docs/user/spring/stream_binder.rst deleted file mode 100644 index be5cf896d9da..000000000000 --- a/docs/user/spring/stream_binder.rst +++ /dev/null @@ -1,182 +0,0 @@ -Spring Cloud Stream Geomesa Kafka Datastore Binder -================================================== - -The Spring Cloud Stream Geomesa Kafka Datastore Binder provides an easy way for Spring Cloud Stream apps to hook into -Geomesa Kafka Datastore to process events. - -If you are unfamiliar with Spring Cloud Stream, see the official documentation for an introduction: -https://spring.io/projects/spring-cloud-stream - -Input/Output Types ------------- - -This binder will provide all `KafkaFeatureEvent`s from kafka datastore to your configured function definitions. Each -function will have to do it's own type comparision to see if the event is a `KafkaFeatureEvent.KafkaFeatureChanged`, -`KafkaFeatureEvent.KafkaFeatureRemoved`, or another event type. - -The module also ships with a SimpleFeature converter, which allows you to configure function definitions that consume -or produces `SimpleFeature`s and avoid working with `KafkaFeatureEvent`s directly. - -Note: The SimpleFeature converter extracts the SimpleFeature out of `KafkaFeatureEvent.KafkaFeatureChanged` events and -ignores all others. Any function definition that consumes SimpleFeatures will miss the -`KafkaFeatureEvent.KafkaFeatureRemoved` and ``KafkaFeatureEvent.KafkaFeatureCleared` messages. And any function -definition that only writes SimpleFeatures will not be able to send those messages. - -Configuration -------------- - -The configuration options are under spring.cloud.stream.kafka-datastore.binder. This binder will accept any -configuration options for the standard java geomesa kafka-datastore, with the periods ('.') replaced with dashes ('-'). -For example, to specify kafka.catalog.topic for the binder, set: -```yaml -spring: - cloud: - stream: - kafka-datastore: - binder: - kafka-catalog-topic: geomesa-catalog-topic -``` - -For a full list of configuration options, see: https://www.geomesa.org/documentation/stable/user/kafka/usage.html - -Examples --------- - -Simple Logger App ------------------ -``` -@Bean -public Consumer log() { - return obj -> logger.info(obj.toString()); -} -``` -``` -spring: - cloud: - function: - definition: log - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - log-in-0: input - bindings: - input: - destination: messages - group: logger -``` - -Simple Enricher App -------------------- - -``` -@Bean -public Function attachSourceField() { - return sf -> { - sf.setAttribute("source", "un-labelled source"); - return sf; - }; -} -``` -``` -spring: - cloud: - function: - definition: attachSourceField - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - attachSourceField-in-0: input - attachSourceField-out-0: output - bindings: - input: - destination: un-labelled-source-ob - group: sft-reader - output: - destination: observations - group: sft-writer -``` - -Simple Filter App -------------------- -``` -@Bean -public Function excludeMoving() { - return sf -> { - if (sf.getAttribute("status").equals("IN_TRANSIT")) { - return null; - } - return sf; - }; -} -``` -``` -spring: - cloud: - function: - definition: filterMoving - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - filterMoving-in-0: input - filterMoving-out-0: output - bindings: - input: - destination: movingAndUnmovingThings - group: sft-reader - output: - destination: unMovingThings - group: sft-writer -``` - -Multiple Datastore App ----------------------- - -In the case of multi-bindings, you simply need to submit override the proper kafka-datastore fields in the environment -field. - -``` -@Bean -public Function passThrough() { - return event -> event; -} -``` -``` -spring: - cloud: - function: - definition: passThrough - stream: - kafka-datastore.binder: - kafka-brokers: kafka:9092 - kafka-zookeepers: zookeeper:2181 - function.bindings: - passThrough-in-0: input - passThrough-out-0: output - binders: - kds-start: - type: kafka-datastore - environment: - spring.cloud.stream.kafka-datastore.binder: - kafka-zk-path: geomesa/start - kds-end: - type: kafka-datastore - environment: - spring.cloud.stream.kafka-datastore.binder: - kafka-zk-path: geomesa/end - bindings: - input: - destination: observations - group: sft-reader - binder: kds-start - output: - destination: observations - group: sft-writer - binder: kds-end - -``` \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml similarity index 65% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml index 7a1ed6bf131e..b27a34e0f48b 100644 --- a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/pom.xml +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml @@ -5,16 +5,19 @@ 4.0.0 org.locationtech.geomesa - geomesa-spring + geomesa-kafka_2.12 5.1.0-SNAPSHOT - geomesa-spring-cloud-stream-binder-kafka-datastore + geomesa-kafka-spring-cloud-stream-binder + GeoMesa Kafka Spring Cloud Stream Binder 11 11 UTF-8 + 2021.0.9 + 2.7.18 @@ -61,6 +64,30 @@ + + org.junit.vintage + junit-vintage-engine + test + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + \ No newline at end of file diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/main/resources/META-INF/spring.binders rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/observation.conf similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/observation.conf rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/observation.conf diff --git a/geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/reference.conf similarity index 100% rename from geomesa-spring/geomesa-spring-cloud-stream-binder-kafka-datastore/src/test/resources/reference.conf rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/reference.conf diff --git a/geomesa-kafka/pom.xml b/geomesa-kafka/pom.xml index 994adb5ca33c..3b8898fde893 100644 --- a/geomesa-kafka/pom.xml +++ b/geomesa-kafka/pom.xml @@ -18,6 +18,7 @@ geomesa-kafka-gs-plugin geomesa-kafka-tools geomesa-kafka-utils + geomesa-kafka-spring-cloud-stream-binder diff --git a/geomesa-spring/pom.xml b/geomesa-spring/pom.xml deleted file mode 100644 index 3028d73d7d7e..000000000000 --- a/geomesa-spring/pom.xml +++ /dev/null @@ -1,45 +0,0 @@ - - - 4.0.0 - - org.locationtech.geomesa - geomesa_2.12 - 5.1.0-SNAPSHOT - - - geomesa-spring - pom - - geomesa-spring-cloud-stream-binder-kafka-datastore - - - - 11 - 11 - UTF-8 - 2021.0.9 - 2.7.18 - - - - - - org.springframework.boot - spring-boot-dependencies - ${spring-boot.version} - pom - import - - - org.springframework.cloud - spring-cloud-dependencies - ${spring-cloud.version} - pom - import - - - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4e92f90f7ddc..57b921fb0295 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,6 @@ geomesa-utils-parent geomesa-z3 docs - geomesa-spring From a8ecb36c8d3a3fc4333092a5d1c43acdb1e2feda Mon Sep 17 00:00:00 2001 From: Walter Schultz Date: Wed, 5 Jun 2024 14:08:49 -0400 Subject: [PATCH 3/3] Added a check for sft in the kds, if it isn't on the classpath --- .../pom.xml | 31 ++++++-- .../KafkaDatastoreBinderConfiguration.java | 4 +- ...atastoreBinderConfigurationProperties.java | 4 +- .../KafkaDatastoreBinderProvisioner.java | 2 +- .../binder}/KafkaDatastoreMessageBinder.java | 9 ++- .../KafkaDatastoreMessageProducer.java | 2 +- .../converters/SimpleFeatureConverter.java | 2 +- .../main/resources/META-INF/spring.binders | 2 +- .../KafkaDatastoreMessageBinderTest.java | 72 +++++++++++++------ 9 files changed, 94 insertions(+), 34 deletions(-) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreBinderConfiguration.java (94%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreBinderConfigurationProperties.java (89%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreBinderProvisioner.java (96%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreMessageBinder.java (92%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreMessageProducer.java (97%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/converters/SimpleFeatureConverter.java (96%) rename geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/{spring/binder/kafka/datastore => kafka/spring/binder}/KafkaDatastoreMessageBinderTest.java (74%) diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml index b27a34e0f48b..cf487d828cb5 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml @@ -13,8 +13,6 @@ GeoMesa Kafka Spring Cloud Stream Binder - 11 - 11 UTF-8 2021.0.9 2.7.18 @@ -32,7 +30,14 @@ org.springframework.boot spring-boot-starter-log4j2 + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.curator @@ -62,11 +67,29 @@ org.apache.logging.log4j log4j-to-slf4j + + ch.qos.logback + logback-classic + + + org.junit.jupiter + junit-jupiter + - org.junit.vintage - junit-vintage-engine + junit + junit + test + + + + + + + + org.slf4j + slf4j-reload4j test diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java similarity index 94% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java index dfd5cf4db7bb..e7bb93592c1c 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfiguration.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java @@ -6,11 +6,11 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; import org.geotools.api.data.DataStoreFinder; -import org.locationtech.geomesa.spring.binder.kafka.datastore.converters.SimpleFeatureConverter; +import org.locationtech.geomesa.kafka.spring.binder.converters.SimpleFeatureConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java similarity index 89% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java index e466ef9105ea..558970012e34 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderConfigurationProperties.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -15,7 +15,7 @@ import java.util.Map; @ConfigurationProperties(prefix = "spring.cloud.stream.kafka-datastore") -public class KafkaDatastoreBinderConfigurationProperties { + public class KafkaDatastoreBinderConfigurationProperties { public Map binder = new HashMap<>(); public Map getBinder() { diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java similarity index 96% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java index 51abaae3a2d1..5b97b2f3e44b 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.springframework.cloud.stream.binder.ConsumerProperties; import org.springframework.cloud.stream.binder.ProducerProperties; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java similarity index 92% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java index bd1cde486384..acd190c69d21 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinder.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; import org.geotools.api.data.FeatureWriter; @@ -69,7 +69,12 @@ protected MessageHandler createProducerMessageHandler( if (sft.isDefined()) { ds.createSchema(sft.get()); } else { - logger.warn("Could not find a local version of {}, hoping the KDS is already defined...", sfName); + try { + ds.getSchema(sfName); + logger.debug("There is no local schema for {}, but we found it in the kds", sfName); + } catch (IOException e) { + logger.error("There is no sft schema {} in the kds {} or locally", sfName, ds.getInfo().getDescription(), e); + } } if (message.getPayload() instanceof SimpleFeature) { diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java similarity index 97% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java index caac64aef182..98e4b4f85944 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageProducer.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java similarity index 96% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java index b5ccbf3c7869..496aef697e56 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/spring/binder/kafka/datastore/converters/SimpleFeatureConverter.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore.converters; +package org.locationtech.geomesa.kafka.spring.binder.converters; import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; import org.geotools.api.feature.simple.SimpleFeature; diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders index 141a8515bc72..efa66a3b0df4 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders @@ -1,2 +1,2 @@ kafka-datastore:\ -com.ccri.geomesa.binder.kafka.datastore.KafkaDatastoreBinderConfiguration \ No newline at end of file +org.locationtech.geomesa.kafka.spring.binder.KafkaDatastoreBinderConfiguration \ No newline at end of file diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java similarity index 74% rename from geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java rename to geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java index 9bcc0bfe48dd..077a5ef1f382 100644 --- a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreMessageBinderTest.java +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java @@ -6,7 +6,7 @@ * http://www.opensource.org/licenses/apache2.0.php. ***********************************************************************/ -package org.locationtech.geomesa.spring.binder.kafka.datastore; +package org.locationtech.geomesa.kafka.spring.binder; import org.geotools.api.data.DataStore; import org.geotools.api.data.FeatureWriter; @@ -15,12 +15,10 @@ import org.geotools.api.feature.simple.SimpleFeatureType; import org.geotools.api.filter.Filter; import org.geotools.feature.simple.SimpleFeatureBuilder; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.Before; +import org.junit.Test; import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.messaging.Message; @@ -31,29 +29,33 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Date; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; -@ExtendWith(MockitoExtension.class) -class KafkaDatastoreMessageBinderTest { +public class KafkaDatastoreMessageBinderTest { - @Mock DataStore ds; - @Mock FeatureWriter featureWriter; - @Mock SimpleFeatureStore simpleFeatureStore; - @Mock SubscribableChannel errorChannel; - - Supplier dsFactory = () -> ds; + Supplier dsFactory; + + @Before + public void init() { + ds = mock(DataStore.class); + featureWriter = mock(FeatureWriter.class); + simpleFeatureStore = mock(SimpleFeatureStore.class); + errorChannel = mock(SubscribableChannel.class); + dsFactory = () -> ds; + } @Test public void producerMessageHandler_canWriteSft() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); @@ -70,7 +72,7 @@ public void producerMessageHandler_canWriteSft() throws IOException { SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); - simpleFeature.setAttribute("mmsi", 123456); + simpleFeature.setAttribute("id", "123456"); Message message = MessageBuilder.withPayload(simpleFeature) .setHeader("featureType", "application/simple-feature") .build(); @@ -78,11 +80,12 @@ public void producerMessageHandler_canWriteSft() throws IOException { handler.handleMessage(message); verify(featureWriter).write(); - assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456); + assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456"); } @Test public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOException { + var now = Instant.now(); SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); @@ -99,8 +102,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); - simpleFeature.setAttribute("mmsi", 123456); - simpleFeature.setAttribute("elevation", 100); + simpleFeature.setAttribute("id", "123456"); + simpleFeature.setAttribute("dtg", now); KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond()); Message message = MessageBuilder.withPayload(changed) .setHeader("featureType", "application/kafka-feature-event") @@ -109,8 +112,8 @@ public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOE handler.handleMessage(message); verify(featureWriter).write(); - assertThat(writeableFeature.getAttribute("mmsi")).isEqualTo(123456); - assertThat(writeableFeature.getAttribute("elevation")).isEqualTo(100f); + assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456"); + assertThat(writeableFeature.getAttribute("dtg")).isEqualTo(Date.from(now)); } @Test @@ -165,4 +168,33 @@ public void producerMessageHandler_canWriteKafkaFeatureEventClear() throws IOExc verify(simpleFeatureStore).removeFeatures(Filter.INCLUDE); } + @Test + public void producerMessageHandler_loadSftFromClasspath() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); + doReturn(writeableFeature).when(featureWriter).next(); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("observation", producerProperties); + + // Doesn't throw an error + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); + simpleFeature.setAttribute("id", "123456"); + KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(changed) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(ds).createSchema(any()); + } + } \ No newline at end of file