Tests: Offsets werden unter TopicPartition abgelegt
authorKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 13:25:57 +0000 (15:25 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Apr 2022 13:23:50 +0000 (15:23 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 5e439d6..8461824 100644 (file)
@@ -71,7 +71,7 @@ class ApplicationTests
                send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
 
                Set<ConsumerRecord<String, Long>> received = new HashSet<>();
-               Map<Integer, Long> offsets = runEndlessConsumer(record ->
+               Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
                {
                        received.add(record);
                        if (received.size() == 100)
@@ -90,9 +90,9 @@ class ApplicationTests
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
                                                : new Bytes(longSerializer.serialize(TOPIC, counter)));
 
-               Map<Integer, Long> oldOffsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
-               Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+               Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
+               Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
 
                check(oldOffsets);
        }
@@ -139,11 +139,15 @@ class ApplicationTests
                }
        }
 
-       Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+       Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
        {
-               Map<Integer, Long> offsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
-               Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+               Map<TopicPartition, Long> offsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+               Consumer<ConsumerRecord<String, Long>> captureOffset =
+                               record ->
+                                               offsets.put(
+                                                               new TopicPartition(record.topic(), record.partition()),
+                                                               record.offset());
                EndlessConsumer<String, Long> endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
@@ -173,11 +177,11 @@ class ApplicationTests
                kafkaConsumer.unsubscribe();
        }
 
-       void check(Map<Integer, Long> offsets)
+       void check(Map<TopicPartition, Long> offsets)
        {
                doForCurrentOffsets((tp, offset) ->
                {
-                       Long expected = offsets.get(tp.partition()) + 1;
+                       Long expected = offsets.get(tp) + 1;
                        log.debug("Checking, if the offset for {} is {}", tp, expected);
                        assertThat(offset).isEqualTo(expected);
                });