WIP:kafkahandler
[demos/kafka/training] / src / test / java / de / juplo / kafka / TestRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.common.TopicPartition;
6
7 import java.util.Map;
8 import java.util.Set;
9
10
11 @RequiredArgsConstructor
12 public class TestRecordHandler implements RecordHandler
13 {
14   private final RecordHandler handler;
15
16   Map<TopicPartition, Long> seenOffsets;
17   Set<Message> receivedMessages;
18
19
20   public void onNewRecord(
21       String topic,
22       Integer partition,
23       Long offset,
24       Message messgage)
25   {
26     seenOffsets.put(new TopicPartition(topic, partition), offset);
27     receivedMessages.add(messgage);
28   }
29
30   @Override
31   public void addNumber(
32       String user,
33       String topic,
34       Integer partition,
35       Long offset,
36       MessageAddNumber message)
37   {
38     this.onNewRecord(topic, partition, offset, message);
39     handler.addNumber(user, topic, partition, offset, message);
40   }
41
42   @Override
43   public void calcSum(
44       String user,
45       String topic,
46       Integer partition,
47       Long offset,
48       MessageCalculateSum message)
49   {
50     this.onNewRecord(topic, partition, offset, message);
51     handler.calcSum(user, topic, partition, offset, message);
52   }
53 }