WIP:kafkahandler
[demos/kafka/training] / src / test / java / de / juplo / kafka / TestRecordHandler.java
index 37d3f65..cc5908a 100644 (file)
@@ -9,26 +9,45 @@ import java.util.Set;
 
 
 @RequiredArgsConstructor
-public class TestRecordHandler<K, V> implements RecordHandler<K, V>
+public class TestRecordHandler implements RecordHandler
 {
-  private final RecordHandler<K, V> handler;
+  private final RecordHandler handler;
 
   Map<TopicPartition, Long> seenOffsets;
-  Set<ConsumerRecord<K, V>> receivedRecords;
+  Set<Message> receivedMessages;
 
 
-  public void onNewRecord(ConsumerRecord<K, V> record)
+  public void onNewRecord(
+      String topic,
+      Integer partition,
+      Long offset,
+      Message messgage)
   {
-    seenOffsets.put(
-      new TopicPartition(record.topic(), record.partition()),
-      record.offset());
-    receivedRecords.add(record);
+    seenOffsets.put(new TopicPartition(topic, partition), offset);
+    receivedMessages.add(messgage);
   }
 
   @Override
-  public void accept(ConsumerRecord<K, V> record)
+  public void addNumber(
+      String user,
+      String topic,
+      Integer partition,
+      Long offset,
+      MessageAddNumber message)
   {
-    this.onNewRecord(record);
-    handler.accept(record);
+    this.onNewRecord(topic, partition, offset, message);
+    handler.addNumber(user, topic, partition, offset, message);
+  }
+
+  @Override
+  public void calcSum(
+      String user,
+      String topic,
+      Integer partition,
+      Long offset,
+      MessageCalculateSum message)
+  {
+    this.onNewRecord(topic, partition, offset, message);
+    handler.calcSum(user, topic, partition, offset, message);
   }
 }