From: Kai Moritz <kai@juplo.de>
Date: Mon, 11 Apr 2022 08:04:53 +0000 (+0200)
Subject: Tests: Refaktorisiert - Methoden-Reihenfolge aufgeräumt und kommentiert
X-Git-Tag: wip-DEPRECATED~11^2^2~12
X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2939f4b7bae23df34968ce3d87be1b83cf0fba90;p=demos%2Fkafka%2Ftraining

Tests: Refaktorisiert - Methoden-Reihenfolge aufgeräumt und kommentiert
---

diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
index f64ab43..e35b223 100644
--- a/src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/ApplicationTests.java
@@ -71,6 +71,8 @@ class ApplicationTests
 	Map<TopicPartition, Long> newOffsets;
 
 
+	/** Tests methods */
+
 	@Test
 	@Order(1) // << The poistion pill is not skipped. Hence, this test must run first
 	void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
@@ -111,6 +113,55 @@ class ApplicationTests
 	}
 
 
+	/** Helper methods for the verification of expectations */
+
+	void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
+	{
+		doForCurrentOffsets((tp, offset) ->
+		{
+			Long expected = offsetsToCheck.get(tp) + 1;
+			log.debug("Checking, if the offset for {} is {}", tp, expected);
+			assertThat(offset).isEqualTo(expected);
+		});
+	}
+
+	void checkSeenOffsetsForProgress()
+	{
+		// Be sure, that some messages were consumed...!
+		Set<TopicPartition> withProgress = new HashSet<>();
+		partitions().forEach(tp ->
+		{
+			Long oldOffset = oldOffsets.get(tp);
+			Long newOffset = newOffsets.get(tp);
+			if (!oldOffset.equals(newOffset))
+			{
+				log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+				withProgress.add(tp);
+			}
+		});
+		assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+	}
+
+
+	/** Helper methods for setting up and running the tests */
+
+	void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
+	{
+		offsetConsumer.assign(partitions());
+		partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
+		offsetConsumer.unsubscribe();
+	}
+
+	List<TopicPartition> partitions()
+	{
+		return
+				IntStream
+						.range(0, PARTITIONS)
+						.mapToObj(partition -> new TopicPartition(TOPIC, partition))
+						.collect(Collectors.toList());
+	}
+
+
 	void send100Messages(Function<Long, Bytes> messageGenerator)
 	{
 		long i = 0;
@@ -152,6 +203,7 @@ class ApplicationTests
 		}
 	}
 
+
 	@BeforeEach
 	public void init()
 	{
@@ -186,50 +238,6 @@ class ApplicationTests
 		endlessConsumer.start();
 	}
 
-	List<TopicPartition> partitions()
-	{
-		return
-				IntStream
-						.range(0, PARTITIONS)
-						.mapToObj(partition -> new TopicPartition(TOPIC, partition))
-						.collect(Collectors.toList());
-	}
-
-	void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
-	{
-		offsetConsumer.assign(partitions());
-		partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
-		offsetConsumer.unsubscribe();
-	}
-
-	void checkSeenOffsetsForProgress()
-	{
-		// Be sure, that some messages were consumed...!
-		Set<TopicPartition> withProgress = new HashSet<>();
-		partitions().forEach(tp ->
-		{
-			Long oldOffset = oldOffsets.get(tp);
-			Long newOffset = newOffsets.get(tp);
-			if (!oldOffset.equals(newOffset))
-			{
-				log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
-				withProgress.add(tp);
-			}
-		});
-		assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
-	}
-
-	void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-	{
-		doForCurrentOffsets((tp, offset) ->
-		{
-			Long expected = offsetsToCheck.get(tp) + 1;
-			log.debug("Checking, if the offset for {} is {}", tp, expected);
-			assertThat(offset).isEqualTo(expected);
-		});
-	}
-
-
 	@AfterEach
 	public void deinit()
 	{
@@ -243,6 +251,7 @@ class ApplicationTests
 		}
 	}
 
+
 	@TestConfiguration
 	@Import(ApplicationConfiguration.class)
 	public static class Configuration