`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / test / java / de / juplo / kafka / TestRecordHandler.java
index 37d3f65..d9f4e67 100644 (file)
@@ -1,34 +1,52 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 
 @RequiredArgsConstructor
-public class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler implements RecordHandler
 {
-  private final RecordHandler<K, V> handler;
+  private final RecordHandler handler;
 
   Map<TopicPartition, Long> seenOffsets;
-  Set<ConsumerRecord<K, V>> receivedRecords;
+  int receivedMessages;
 
 
-  public void onNewRecord(ConsumerRecord<K, V> record)
+  public void onNewRecord(
+      String topic,
+      Integer partition,
+      Long offset,
+      Message messgage)
   {
-    seenOffsets.put(
-      new TopicPartition(record.topic(), record.partition()),
-      record.offset());
-    receivedRecords.add(record);
+    seenOffsets.put(new TopicPartition(topic, partition), offset);
+    receivedMessages++;
   }
 
   @Override
-  public void accept(ConsumerRecord<K, V> record)
+  public void addNumber(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageAddNumber message)
   {
-    this.onNewRecord(record);
-    handler.accept(record);
+    this.onNewRecord(topic, partition, offset, message);
+    handler.addNumber(topic, partition, offset, user, message);
+  }
+
+  @Override
+  public void calculateSum(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageCalculateSum message)
+  {
+    this.onNewRecord(topic, partition, offset, message);
+    handler.calculateSum(topic, partition, offset, user, message);
   }
 }