37d3f659f9c78b24a0a62da4e26b8a1169651f68
[demos/kafka/training] / src / test / java / de / juplo / kafka / TestRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.common.TopicPartition;
6
7 import java.util.Map;
8 import java.util.Set;
9
10
11 @RequiredArgsConstructor
12 public class TestRecordHandler<K, V> implements RecordHandler<K, V>
13 {
14   private final RecordHandler<K, V> handler;
15
16   Map<TopicPartition, Long> seenOffsets;
17   Set<ConsumerRecord<K, V>> receivedRecords;
18
19
20   public void onNewRecord(ConsumerRecord<K, V> record)
21   {
22     seenOffsets.put(
23       new TopicPartition(record.topic(), record.partition()),
24       record.offset());
25     receivedRecords.add(record);
26   }
27
28   @Override
29   public void accept(ConsumerRecord<K, V> record)
30   {
31     this.onNewRecord(record);
32     handler.accept(record);
33   }
34 }