#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-generics-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.1-SNAPSHOT'
+version = '1.1-generics-SNAPSHOT'
java {
toolchain {
mem_limit: 100m
consumer:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-generics-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: consumer
juplo.consumer.topic: test
peter:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-generics-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: peter
juplo.consumer.topic: test
ute:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-generics-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: ute
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1-generics-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
public class ApplicationConfiguration
{
@Bean
- public ExampleConsumer exampleConsumer(
+ public ExampleConsumer<String, String> exampleConsumer(
Consumer<String, String> kafkaConsumer,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
return
- new ExampleConsumer(
+ new ExampleConsumer<>(
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
kafkaConsumer);
@Slf4j
-public class ExampleConsumer implements Runnable, SmartLifecycle
+public class ExampleConsumer<K, V> implements Runnable, SmartLifecycle
{
private final String id;
private final String topic;
- private final Consumer<String, String> consumer;
+ private final Consumer<K, V> consumer;
private Thread workerThread;
private volatile boolean running = false;
public ExampleConsumer(
String clientId,
String topic,
- Consumer<String, String> consumer)
+ Consumer<K, V> consumer)
{
this.id = clientId;
this.topic = topic;
while (true)
{
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+ ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, String> record : records)
+ for (ConsumerRecord<K, V> record : records)
{
handleRecord(
record.topic(),
String topic,
Integer partition,
Long offset,
- String key,
- String value)
+ K key,
+ V value)
{
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);