#!/bin/bash
-IMAGE=juplo/spring-consumer:1.0-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
if [ "$1" = "cleanup" ]
then
command: kafka:9092 test producer
consumer:
- image: juplo/spring-consumer:1.0-SNAPSHOT
+ image: juplo/spring-consumer:1.1-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.0-SNAPSHOT</version>
+ <version>1.1-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
return
new SimpleConsumer(
kafkaProperties.getClientId(),
- applicationProperties.getTopic(),
+ applicationProperties.getTopics(),
kafkaConsumer);
}
{
@NotNull
@NotEmpty
- private String topic;
+ private String[] topics;
}
public class SimpleConsumer implements Callable<Integer>
{
private final String id;
- private final String topic;
+ private final String[] topics;
private final Consumer<String, String> consumer;
private long consumed = 0;
{
try
{
- log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ log.info("{} - Subscribing to topics: {}", id, topics);
+ consumer.subscribe(Arrays.asList(topics));
while (true)
{
simple:
consumer:
- topic: test
+ topics: test
management:
endpoint:
shutdown:
bootstrap-server: ${spring.kafka.bootstrap-servers}
client-id: ${spring.kafka.client-id}
group-id: ${spring.kafka.consumer.group-id}
- topic: ${simple.consumer.topic}
+ topics: ${simple.consumer.topics}
auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
spring:
kafka:
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "simple.consumer.topic=" + TOPIC })
+ "simple.consumer.topics=" + TOPIC })
@EmbeddedKafka(topics = TOPIC)
public class ApplicationIT
{