Tests: Offsets werden unter TopicPartition abgelegt
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
index 5e439d6..8461824 100644 (file)
@@ -71,7 +71,7 @@ class ApplicationTests
                send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
 
                Set<ConsumerRecord<String, Long>> received = new HashSet<>();
-               Map<Integer, Long> offsets = runEndlessConsumer(record ->
+               Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
                {
                        received.add(record);
                        if (received.size() == 100)
@@ -90,9 +90,9 @@ class ApplicationTests
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
                                                : new Bytes(longSerializer.serialize(TOPIC, counter)));
 
-               Map<Integer, Long> oldOffsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
-               Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+               Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
+               Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
 
                check(oldOffsets);
        }
@@ -139,11 +139,15 @@ class ApplicationTests
                }
        }
 
-       Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+       Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
        {
-               Map<Integer, Long> offsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
-               Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+               Map<TopicPartition, Long> offsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+               Consumer<ConsumerRecord<String, Long>> captureOffset =
+                               record ->
+                                               offsets.put(
+                                                               new TopicPartition(record.topic(), record.partition()),
+                                                               record.offset());
                EndlessConsumer<String, Long> endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
@@ -173,11 +177,11 @@ class ApplicationTests
                kafkaConsumer.unsubscribe();
        }
 
-       void check(Map<Integer, Long> offsets)
+       void check(Map<TopicPartition, Long> offsets)
        {
                doForCurrentOffsets((tp, offset) ->
                {
-                       Long expected = offsets.get(tp.partition()) + 1;
+                       Long expected = offsets.get(tp) + 1;
                        log.debug("Checking, if the offset for {} is {}", tp, expected);
                        assertThat(offset).isEqualTo(expected);
                });