public class ApplicationConfiguration
{
@Bean
- public ExampleConsumer exampleConsumer(
+ public ExampleConsumer<String, String> exampleConsumer(
Consumer<String, String> kafkaConsumer,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
return
- new ExampleConsumer(
+ new ExampleConsumer<>(
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
@Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer<K, V> implements Runnable
{
private final String id;
private final String topic;
- private final Consumer<String, String> consumer;
+ private final Consumer<K, V> consumer;
private final Thread workerThread;
private final Runnable closeCallback;
public ExampleConsumer(
String clientId,
String topic,
- Consumer<String, String> consumer,
+ Consumer<K, V> consumer,
Runnable closeCallback)
{
this.id = clientId;
while (running)
{
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+ ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, String> record : records)
+ for (ConsumerRecord<K, V> record : records)
{
handleRecord(
record.topic(),
String topic,
Integer partition,
Long offset,
- String key,
- String value)
+ K key,
+ V value)
{
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);