`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / test / java / de / juplo / kafka / TestRecordHandler.java
index b4efdd6..d9f4e67 100644 (file)
@@ -1,22 +1,52 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.Map;
 
 
 @RequiredArgsConstructor
-public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler implements RecordHandler
 {
-  private final RecordHandler<K, V> handler;
+  private final RecordHandler handler;
+
+  Map<TopicPartition, Long> seenOffsets;
+  int receivedMessages;
 
 
-  public abstract void onNewRecord(ConsumerRecord<K, V> record);
+  public void onNewRecord(
+      String topic,
+      Integer partition,
+      Long offset,
+      Message messgage)
+  {
+    seenOffsets.put(new TopicPartition(topic, partition), offset);
+    receivedMessages++;
+  }
 
+  @Override
+  public void addNumber(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageAddNumber message)
+  {
+    this.onNewRecord(topic, partition, offset, message);
+    handler.addNumber(topic, partition, offset, user, message);
+  }
 
   @Override
-  public void accept(ConsumerRecord<K, V> record)
+  public void calculateSum(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageCalculateSum message)
   {
-    this.onNewRecord(record);
-    handler.accept(record);
+    this.onNewRecord(topic, partition, offset, message);
+    handler.calculateSum(topic, partition, offset, user, message);
   }
 }