Skip to content

Commit 8f0059c

Browse files
author
Stefano Guerrini
authored
Merge pull request #262 from GetFeedback/feature/introduce-kafka-streams-health-checks
Feature/introduce kafka streams health checks
2 parents 807adfa + 083f922 commit 8f0059c

File tree

6 files changed

+195
-1
lines changed

6 files changed

+195
-1
lines changed

kahpp-spring-autoconfigure/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ dependencies{
3030
implementation "io.burt:jmespath-core:0.5.1"
3131
implementation "io.burt:jmespath-jackson:0.5.1"
3232
implementation "io.vavr:vavr:1.0.0-alpha-4"
33-
implementation "com.deviceinsight.kafka:kafka-health-check:1.3.0"
3433
implementation "org.springframework.kafka:spring-kafka"
3534
implementation "org.springframework.boot:spring-boot-starter"
3635
implementation "org.springframework.boot:spring-boot-starter-validation"
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package dev.vox.platform.kahpp.actuator;
2+
3+
import io.micrometer.core.instrument.FunctionCounter;
4+
import io.micrometer.core.instrument.MeterRegistry;
5+
import io.micrometer.core.instrument.search.Search;
6+
import java.util.Optional;
7+
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
8+
import org.springframework.boot.actuate.health.Health;
9+
import org.springframework.boot.actuate.health.Status;
10+
import org.springframework.stereotype.Component;
11+
12+
@Component
13+
public class KafkaStreamsProducerState extends AbstractHealthIndicator {
14+
15+
private final MeterRegistry meterRegistry;
16+
17+
public static final String KAFKA_PRODUCER_CONNECTION_METRIC_LABEL =
18+
"kafka.producer.connection.count";
19+
20+
public KafkaStreamsProducerState(MeterRegistry meterRegistry) {
21+
this.meterRegistry = meterRegistry;
22+
}
23+
24+
@Override
25+
protected void doHealthCheck(Health.Builder builder) {
26+
Search connectionsSearch = meterRegistry.find(KAFKA_PRODUCER_CONNECTION_METRIC_LABEL);
27+
Double kafkaConnections =
28+
Optional.ofNullable(connectionsSearch.functionCounter())
29+
.map(FunctionCounter::count)
30+
.orElse(0d);
31+
if (kafkaConnections > 0) {
32+
builder.status(Status.UP);
33+
return;
34+
}
35+
builder.status(Status.DOWN);
36+
}
37+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package dev.vox.platform.kahpp.actuator;
2+
3+
import java.util.List;
4+
import java.util.Objects;
5+
import org.apache.kafka.streams.KafkaStreams;
6+
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
7+
import org.springframework.boot.actuate.health.Health;
8+
import org.springframework.boot.actuate.health.Status;
9+
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
10+
import org.springframework.stereotype.Component;
11+
12+
@Component
13+
public class KafkaStreamsState extends AbstractHealthIndicator {
14+
15+
private final List<StreamsBuilderFactoryBean> streamsBuilders;
16+
17+
public KafkaStreamsState(List<StreamsBuilderFactoryBean> streamsBuilders) {
18+
this.streamsBuilders = List.copyOf(streamsBuilders);
19+
}
20+
21+
@Override
22+
@SuppressWarnings("PMD.CloseResource")
23+
protected void doHealthCheck(Health.Builder builder) {
24+
List<KafkaStreams> streamsList =
25+
streamsBuilders.stream()
26+
.map(StreamsBuilderFactoryBean::getKafkaStreams)
27+
.filter(Objects::nonNull)
28+
.toList();
29+
30+
if (streamsList.isEmpty()) {
31+
builder.status(Status.UNKNOWN);
32+
return;
33+
}
34+
35+
for (KafkaStreams streams : streamsList) {
36+
if (streams.state().hasNotStarted()) {
37+
builder.status(Status.UNKNOWN);
38+
return;
39+
} else if (!streams.state().isRunningOrRebalancing()) {
40+
builder.status(Status.DOWN);
41+
return;
42+
}
43+
}
44+
45+
builder.status(Status.UP);
46+
}
47+
}

kahpp-spring-autoconfigure/src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ management.endpoint.metrics.enabled=true
1414
management.endpoint.health.show-details=always
1515
management.metrics.export.jmx.domain=dev.vox
1616
management.endpoints.web.exposure.include=metrics,health,prometheus
17+
management.endpoint.health.group.readiness.include=readinessState,kafkaStreamsProducerState,kafkaStreamsState
18+
management.endpoint.health.group.liveness.include=livenessState,kafkaStreamsProducerState,kafkaStreamsState
1719
spring.config.import=${KAHPP_CONFIG_LOCATION:file:/kahpp/application.yaml}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package dev.vox.platform.kahpp.actuator;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.micrometer.core.instrument.*;
6+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.api.extension.ExtendWith;
10+
import org.mockito.junit.jupiter.MockitoExtension;
11+
import org.springframework.boot.actuate.health.Health;
12+
import org.springframework.boot.actuate.health.Status;
13+
14+
@ExtendWith(MockitoExtension.class)
15+
class KafkaStreamsProducerStateTest {
16+
17+
private MeterRegistry meterRegistry;
18+
19+
@BeforeEach
20+
public void setUp() {
21+
meterRegistry = new SimpleMeterRegistry();
22+
}
23+
24+
@Test
25+
void statusDownWhenProducerIsNotActive() {
26+
meterRegistry
27+
.more()
28+
.counter(KafkaStreamsProducerState.KAFKA_PRODUCER_CONNECTION_METRIC_LABEL, Tags.empty(), 0);
29+
Health.Builder builder = new Health.Builder();
30+
new KafkaStreamsProducerState(meterRegistry).doHealthCheck(builder);
31+
assertThat(builder.build().getStatus()).isEqualTo(Status.DOWN);
32+
}
33+
34+
@Test
35+
void statusUpWhenProducerIsActive() {
36+
meterRegistry
37+
.more()
38+
.counter(KafkaStreamsProducerState.KAFKA_PRODUCER_CONNECTION_METRIC_LABEL, Tags.empty(), 1);
39+
Health.Builder builder = new Health.Builder();
40+
new KafkaStreamsProducerState(meterRegistry).doHealthCheck(builder);
41+
assertThat(builder.build().getStatus()).isEqualTo(Status.UP);
42+
}
43+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package dev.vox.platform.kahpp.actuator;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.Mockito.lenient;
5+
import static org.mockito.Mockito.when;
6+
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.Map;
10+
import org.apache.kafka.streams.KafkaStreams;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
import org.junit.jupiter.api.extension.ExtendWith;
14+
import org.mockito.Mock;
15+
import org.mockito.junit.jupiter.MockitoExtension;
16+
import org.springframework.boot.actuate.health.Health;
17+
import org.springframework.boot.actuate.health.Status;
18+
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
19+
20+
@ExtendWith(MockitoExtension.class)
21+
class KafkaStreamsStateTest {
22+
23+
@Mock private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
24+
25+
@Mock private KafkaStreams kafkaStreams;
26+
27+
Map<KafkaStreams.State, Status> kafkaStateMapping =
28+
Map.of(
29+
KafkaStreams.State.CREATED, Status.UNKNOWN,
30+
KafkaStreams.State.RUNNING, Status.UP,
31+
KafkaStreams.State.REBALANCING, Status.UP,
32+
KafkaStreams.State.ERROR, Status.DOWN,
33+
KafkaStreams.State.PENDING_ERROR, Status.DOWN,
34+
KafkaStreams.State.PENDING_SHUTDOWN, Status.DOWN,
35+
KafkaStreams.State.NOT_RUNNING, Status.DOWN);
36+
37+
@BeforeEach
38+
public void setUp() {
39+
lenient().when(streamsBuilderFactoryBean.getKafkaStreams()).thenReturn(kafkaStreams);
40+
}
41+
42+
@Test
43+
void checkIfStatusIsUnknownWhenStreamsIsNull() {
44+
Health.Builder builder = new Health.Builder();
45+
new KafkaStreamsState(List.of()).doHealthCheck(builder);
46+
assertThat(builder.build().getStatus()).isEqualTo(Status.UNKNOWN);
47+
}
48+
49+
@Test
50+
@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert")
51+
void checkAllKafkaStreamsStatuses() {
52+
KafkaStreamsState kafkaStreamsState = new KafkaStreamsState(List.of(streamsBuilderFactoryBean));
53+
Health.Builder builder = new Health.Builder();
54+
55+
new KafkaStreamsState(List.of(streamsBuilderFactoryBean));
56+
Arrays.stream(KafkaStreams.State.values())
57+
.forEach(
58+
state -> {
59+
when(kafkaStreams.state()).thenReturn(state);
60+
kafkaStreamsState.doHealthCheck(builder);
61+
assertThat(builder.build().getStatus())
62+
.as("Testing %s Kafka Stream State", state)
63+
.isEqualTo(kafkaStateMapping.get(state));
64+
});
65+
}
66+
}

0 commit comments

Comments
 (0)