From ad3b3853816932c81e019f7746f7589ffc2b4c55 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 10 Apr 2022 15:25:57 +0200
Subject: [PATCH] Tests: Offsets werden unter TopicPartition abgelegt

---
 .../java/de/juplo/kafka/ApplicationTests.java | 24 +++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
index 5e439d6..8461824 100644
--- a/src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/ApplicationTests.java
@@ -71,7 +71,7 @@ class ApplicationTests
 		send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
 
 		Set<ConsumerRecord<String, Long>> received = new HashSet<>();
-		Map<Integer, Long> offsets = runEndlessConsumer(record ->
+		Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
 		{
 			received.add(record);
 			if (received.size() == 100)
@@ -90,9 +90,9 @@ class ApplicationTests
 						? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
 						: new Bytes(longSerializer.serialize(TOPIC, counter)));
 
-		Map<Integer, Long> oldOffsets = new HashMap<>();
-		doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
-		Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+		Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+		doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
+		Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
 
 		check(oldOffsets);
 	}
@@ -139,11 +139,15 @@ class ApplicationTests
 		}
 	}
 
-	Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+	Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
 	{
-		Map<Integer, Long> offsets = new HashMap<>();
-		doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
-		Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+		Map<TopicPartition, Long> offsets = new HashMap<>();
+		doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+		Consumer<ConsumerRecord<String, Long>> captureOffset =
+				record ->
+						offsets.put(
+								new TopicPartition(record.topic(), record.partition()),
+								record.offset());
 		EndlessConsumer<String, Long> endlessConsumer =
 				new EndlessConsumer<>(
 						executor,
@@ -173,11 +177,11 @@ class ApplicationTests
 		kafkaConsumer.unsubscribe();
 	}
 
-	void check(Map<Integer, Long> offsets)
+	void check(Map<TopicPartition, Long> offsets)
 	{
 		doForCurrentOffsets((tp, offset) ->
 		{
-			Long expected = offsets.get(tp.partition()) + 1;
+			Long expected = offsets.get(tp) + 1;
 			log.debug("Checking, if the offset for {} is {}", tp, expected);
 			assertThat(offset).isEqualTo(expected);
 		});
-- 
2.20.1