WIP:kafkahandler wip-kafka-handler
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 11:21:58 +0000 (13:21 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 11:21:58 +0000 (13:21 +0200)
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/RecordHandler.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java
src/test/java/de/juplo/kafka/TestRecordHandler.java

index 61b265b..1804951 100644 (file)
@@ -34,46 +34,35 @@ public class ApplicationRecordHandler implements RecordHandler<String, Message>
 
 
   @KafkaHandler
+  @Override
   public void addNumber(
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Header(KafkaHeaders.OFFSET) Long offset,
     @Payload MessageAddNumber message)
   {
     state.get(partition).addToSum(user, message.getNext());
+    throttle();
   }
 
   @KafkaHandler
+  @Override
   public void calcSum(
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Header(KafkaHeaders.OFFSET) Long offset,
     @Payload MessageCalculateSum message)
   {
     AdderResult result = state.get(partition).calculate(user);
     log.info("{} - New result for {}: {}", id, user, result);
     results.addResults(partition, user, result);
+    throttle();
   }
 
-  @Override
-  public void accept(ConsumerRecord<String, Message> record)
+  private void throttle()
   {
-    Integer partition = record.partition();
-    String user = record.key();
-    Message message = record.value();
-
-    switch(message.getType())
-    {
-      case ADD:
-        MessageAddNumber addNumber = (MessageAddNumber)message;
-        state.get(partition).addToSum(user, addNumber.getNext());
-        break;
-
-      case CALC:
-        AdderResult result = state.get(partition).calculate(user);
-        log.info("{} - New result for {}: {}", id, user, result);
-        results.addResults(partition, user, result);
-        break;
-    }
-
     if (throttle.isPresent())
     {
       try
index 327ac9f..d71bfed 100644 (file)
@@ -5,6 +5,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import java.util.function.Consumer;
 
 
-public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+public interface RecordHandler<K, V>
 {
+  void addNumber(
+      String user,
+      String topic,
+      Integer partition,
+      Long offset,
+      MessageAddNumber message);
+  void calcSum(
+      String user,
+      String topic,
+      Integer partition,
+      Long offset,
+      MessageCalculateSum message);
 }
index 1e28b06..8e81e26 100644 (file)
@@ -68,7 +68,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
        @Autowired
-       TestRecordHandler<K, V> recordHandler;
+       TestRecordHandler recordHandler;
        @Autowired
        DriverController driverController;
 
@@ -98,7 +98,7 @@ abstract class GenericApplicationTests<K, V>
                await(numberOfGeneratedMessages + " records received")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
+                               .until(() -> recordHandler.receivedMessages.size() >= numberOfGeneratedMessages);
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
@@ -140,7 +140,7 @@ abstract class GenericApplicationTests<K, V>
 
                checkSeenOffsetsForProgress();
                assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-               assertThat(recordHandler.receivedRecords.size())
+               assertThat(recordHandler.receivedMessages.size())
                                .describedAs("Received not all sent events")
                                .isLessThan(numberOfGeneratedMessages);
 
@@ -347,7 +347,7 @@ abstract class GenericApplicationTests<K, V>
 
                oldOffsets = new HashMap<>();
                recordHandler.seenOffsets = new HashMap<>();
-               recordHandler.receivedRecords = new HashSet<>();
+               recordHandler.receivedMessages = new HashSet<>();
 
                doForCurrentOffsets((tp, offset) ->
                {
index 37d3f65..cc5908a 100644 (file)
@@ -9,26 +9,45 @@ import java.util.Set;
 
 
 @RequiredArgsConstructor
-public class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler implements RecordHandler
 {
-  private final RecordHandler<K, V> handler;
+  private final RecordHandler handler;
 
   Map<TopicPartition, Long> seenOffsets;
-  Set<ConsumerRecord<K, V>> receivedRecords;
+  Set<Message> receivedMessages;
 
 
-  public void onNewRecord(ConsumerRecord<K, V> record)
+  public void onNewRecord(
+      String topic,
+      Integer partition,
+      Long offset,
+      Message messgage)
   {
-    seenOffsets.put(
-      new TopicPartition(record.topic(), record.partition()),
-      record.offset());
-    receivedRecords.add(record);
+    seenOffsets.put(new TopicPartition(topic, partition), offset);
+    receivedMessages.add(messgage);
   }
 
   @Override
-  public void accept(ConsumerRecord<K, V> record)
+  public void addNumber(
+      String user,
+      String topic,
+      Integer partition,
+      Long offset,
+      MessageAddNumber message)
   {
-    this.onNewRecord(record);
-    handler.accept(record);
+    this.onNewRecord(topic, partition, offset, message);
+    handler.addNumber(user, topic, partition, offset, message);
+  }
+
+  @Override
+  public void calcSum(
+      String user,
+      String topic,
+      Integer partition,
+      Long offset,
+      MessageCalculateSum message)
+  {
+    this.onNewRecord(topic, partition, offset, message);
+    handler.calcSum(user, topic, partition, offset, message);
   }
 }