#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-long-SNAPSHOT
if [ "$1" = "cleanup" ]
then
fi
docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2
if [[
$(docker image ls -q $IMAGE) == "" ||
docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
+docker compose -f docker/docker-compose.yml up -d consumer-1 consumer-2
+sleep 15
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
+docker compose -f docker/docker-compose.yml stop producer
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
+echo
+echo "Von consumer-1 empfangen:"
+docker compose -f docker/docker-compose.yml logs consumer-1 | grep '\ test\/.'
+echo
+echo "Von consumer-2 empfangen:"
+docker compose -f docker/docker-compose.yml logs consumer-2 | grep '\ test\/.'
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
+docker compose -f docker/docker-compose.yml stop consumer-1 consumer-2
- kafka-3
producer:
- image: juplo/spring-producer:1.0-SNAPSHOT
+ image: juplo/spring-producer:1.0-long-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: producer
juplo.producer.topic: test
- juplo.producer.linger-ms: 666
- juplo.producer.throttle-ms: 100
- consumer:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ consumer-1:
+ image: juplo/spring-consumer:1.1-long-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
- juplo.client-id: consumer
+ juplo.client-id: consumer-1
+ juplo.consumer.topic: test
+
+ consumer-2:
+ image: juplo/spring-consumer:1.1-long-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: consumer-2
juplo.consumer.topic: test
volumes:
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1-long-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
{
@Bean
public ExampleConsumer exampleConsumer(
- Consumer<String, String> kafkaConsumer,
+ Consumer<String, Long> kafkaConsumer,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
}
@Bean(destroyMethod = "")
- public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+ public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("metadata.maxage.ms", 5000); // 5 Sekunden
props.put("partition.assignment.strategy", StickyAssignor.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", LongDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
{
private final String id;
private final String topic;
- private final Consumer<String, String> consumer;
+ private final Consumer<String, Long> consumer;
private final Thread workerThread;
private final Runnable closeCallback;
public ExampleConsumer(
String clientId,
String topic,
- Consumer<String, String> consumer,
+ Consumer<String, Long> consumer,
Runnable closeCallback)
{
this.id = clientId;
while (running)
{
- ConsumerRecords<String, String> records =
+ ConsumerRecords<String, Long> records =
consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, String> record : records)
+ for (ConsumerRecord<String, Long> record : records)
{
handleRecord(
record.topic(),
Integer partition,
Long offset,
String key,
- String value)
+ Long value)
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);