Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets
[demos/kafka/training] / src / test / java / de / juplo / kafka / TestRecordHandler.java
index 4047093..37d3f65 100644 (file)
@@ -4,15 +4,26 @@ import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Map;
+import java.util.Set;
+
 
 @RequiredArgsConstructor
-public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler<K, V> implements RecordHandler<K, V>
 {
   private final RecordHandler<K, V> handler;
 
+  Map<TopicPartition, Long> seenOffsets;
+  Set<ConsumerRecord<K, V>> receivedRecords;
 
-  public abstract void onNewRecord(ConsumerRecord<K, V> record);
 
+  public void onNewRecord(ConsumerRecord<K, V> record)
+  {
+    seenOffsets.put(
+      new TopicPartition(record.topic(), record.partition()),
+      record.offset());
+    receivedRecords.add(record);
+  }
 
   @Override
   public void accept(ConsumerRecord<K, V> record)
@@ -20,22 +31,4 @@ public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
     this.onNewRecord(record);
     handler.accept(record);
   }
-  @Override
-
-  public void beforeNextPoll()
-  {
-    handler.beforeNextPoll();
-  }
-
-  @Override
-  public void onPartitionAssigned(TopicPartition tp)
-  {
-    handler.onPartitionAssigned(tp);
-  }
-
-  @Override
-  public void onPartitionRevoked(TopicPartition tp)
-  {
-    handler.onPartitionRevoked(tp);
-  }
 }