`EndlessConsumer` auf `@KafkaHandler` umgestellt
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 16:04:04 +0000 (18:04 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 14:42:49 +0000 (16:42 +0200)
* Mit `@KafkaHandler` können separate Handler-Methoden für die
  unterschiedlichen Nachrichten-Typen definiert werden, die die
  Anwendung empfängt (hier: über ein Topic, auch mögich: über
  verschiedene Topics).
* Die Tests mussten an die geänderte Implementierung angepasst werden.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/RecordHandler.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java
src/test/java/de/juplo/kafka/TestRecordHandler.java

index f8bf857..1755747 100644 (file)
@@ -53,14 +53,14 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public EndlessConsumer<String, Message> endlessConsumer(
+  public EndlessConsumer endlessConsumer(
       RecordHandler recordHandler,
       ApplicationErrorHandler errorHandler,
       KafkaProperties kafkaProperties,
       KafkaListenerEndpointRegistry endpointRegistry)
   {
     return
-        new EndlessConsumer<>(
+        new EndlessConsumer(
             kafkaProperties.getClientId(),
             endpointRegistry,
             errorHandler,
index 03a14c8..ab9782c 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, Message> consumer;
+  private final EndlessConsumer consumer;
 
 
   @Override
index 2829157..f4d3671 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -12,7 +11,7 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, Message>
+public class ApplicationRecordHandler implements RecordHandler
 {
   private final AdderResults results;
   private final Optional<Duration> throttle;
@@ -21,42 +20,34 @@ public class ApplicationRecordHandler implements RecordHandler<String, Message>
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
+  @Override
   public void addNumber(
+      String topic,
       Integer partition,
+      Long offset,
       String user,
       MessageAddNumber message)
   {
     state.get(partition).addToSum(user, message.getNext());
+    throttle();
   }
 
+  @Override
   public void calculateSum(
+      String topic,
       Integer partition,
+      Long offset,
       String user,
       MessageCalculateSum message)
   {
     AdderResult result = state.get(partition).calculate(user);
     log.info("{} - New result for {}: {}", id, user, result);
     results.addResults(partition, user, result);
+    throttle();
   }
 
-  @Override
-  public void accept(ConsumerRecord<String, Message> record)
+  private void throttle()
   {
-    Integer partition = record.partition();
-    String user = record.key();
-    Message message = record.value();
-
-    switch(message.getType())
-    {
-      case ADD:
-        addNumber(partition, user, (MessageAddNumber) message);
-        break;
-
-      case CALC:
-        calculateSum(partition, user, (MessageCalculateSum) message);
-        break;
-    }
-
     if (throttle.isPresent())
     {
       try
index 01397a2..655151a 100644 (file)
@@ -3,8 +3,12 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaHandler;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
 
 import java.util.List;
 import java.util.Optional;
@@ -12,34 +16,63 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
-public class EndlessConsumer<K, V>
+@KafkaListener(
+    id = "${spring.kafka.client-id}",
+    idIsGroup = false,
+    topics = "${sumup.adder.topic}",
+    autoStartup = "false")
+public class EndlessConsumer
 {
   private final String id;
   private final KafkaListenerEndpointRegistry registry;
   private final ApplicationErrorHandler errorHandler;
-  private final RecordHandler<K, V> recordHandler;
+  private final RecordHandler recordHandler;
 
   private long consumed = 0;
 
 
-  @KafkaListener(
-      id = "${spring.kafka.client-id}",
-      idIsGroup = false,
-      topics = "${sumup.adder.topic}",
-      autoStartup = "false")
-  public void accept(ConsumerRecord<K, V> record)
+  @KafkaHandler
+  public void addNumber(
+    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
+    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Header(KafkaHeaders.OFFSET) Long offset,
+    @Payload MessageAddNumber message)
   {
           log.info(
               "{} - {}: {}/{} - {}={}",
               id,
-              record.offset(),
-              record.topic(),
-              record.partition(),
-              record.key(),
-              record.value()
+              offset,
+              topic,
+              partition,
+              key,
+              message
           );
 
-          recordHandler.accept(record);
+          recordHandler.addNumber(topic, partition, offset, key, message);
+
+          consumed++;
+  }
+
+  @KafkaHandler
+  public void calculateSum(
+    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
+    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Header(KafkaHeaders.OFFSET) Long offset,
+    @Payload MessageCalculateSum message)
+  {
+          log.info(
+              "{} - {}: {}/{} - {}={}",
+              id,
+              offset,
+              topic,
+              partition,
+              key,
+              message
+          );
+
+          recordHandler.calculateSum(topic, partition, offset, key, message);
 
           consumed++;
   }
index 327ac9f..47f984e 100644 (file)
@@ -5,6 +5,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import java.util.function.Consumer;
 
 
-public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+public interface RecordHandler
 {
+  void addNumber(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageAddNumber message);
+  void calculateSum(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageCalculateSum message);
 }
index 124143c..49ddb47 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -69,9 +68,9 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
        @Autowired
-       TestRecordHandler<K, V> recordHandler;
+       TestRecordHandler recordHandler;
        @Autowired
-       EndlessConsumer<K, V> endlessConsumer;
+       EndlessConsumer endlessConsumer;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
@@ -99,7 +98,7 @@ abstract class GenericApplicationTests<K, V>
                await(numberOfGeneratedMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
+                               .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages);
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
@@ -141,7 +140,7 @@ abstract class GenericApplicationTests<K, V>
 
                checkSeenOffsetsForProgress();
                assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-               assertThat(recordHandler.receivedRecords.size())
+               assertThat(recordHandler.receivedMessages)
                                .describedAs("Received not all sent events")
                                .isLessThan(numberOfGeneratedMessages);
 
@@ -348,7 +347,7 @@ abstract class GenericApplicationTests<K, V>
 
                oldOffsets = new HashMap<>();
                recordHandler.seenOffsets = new HashMap<>();
-               recordHandler.receivedRecords = new HashSet<>();
+               recordHandler.receivedMessages = 0;
 
                doForCurrentOffsets((tp, offset) ->
                {
index 37d3f65..d9f4e67 100644 (file)
@@ -1,34 +1,52 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 
 @RequiredArgsConstructor
-public class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler implements RecordHandler
 {
-  private final RecordHandler<K, V> handler;
+  private final RecordHandler handler;
 
   Map<TopicPartition, Long> seenOffsets;
-  Set<ConsumerRecord<K, V>> receivedRecords;
+  int receivedMessages;
 
 
-  public void onNewRecord(ConsumerRecord<K, V> record)
+  public void onNewRecord(
+      String topic,
+      Integer partition,
+      Long offset,
+      Message messgage)
   {
-    seenOffsets.put(
-      new TopicPartition(record.topic(), record.partition()),
-      record.offset());
-    receivedRecords.add(record);
+    seenOffsets.put(new TopicPartition(topic, partition), offset);
+    receivedMessages++;
   }
 
   @Override
-  public void accept(ConsumerRecord<K, V> record)
+  public void addNumber(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageAddNumber message)
   {
-    this.onNewRecord(record);
-    handler.accept(record);
+    this.onNewRecord(topic, partition, offset, message);
+    handler.addNumber(topic, partition, offset, user, message);
+  }
+
+  @Override
+  public void calculateSum(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageCalculateSum message)
+  {
+    this.onNewRecord(topic, partition, offset, message);
+    handler.calculateSum(topic, partition, offset, user, message);
   }
 }