Backport von Verbesserungen / Erweiterungen der Tests:
authorKai Moritz <kai@juplo.de>
Fri, 9 Sep 2022 09:46:25 +0000 (11:46 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 9 Sep 2022 11:31:24 +0000 (13:31 +0200)
* Integration-Test `ApplicationIT`, der prüft, ob die Spring-Boot
  Anwendung ohne Fehler startet und dies über den Endpoint auch meldet.
* Inzwischen hinzugefügte `.editorconfig` übernommen.
* Fachspezifisches Interface `RecordHandler` statt `java.util.Consumer`.
* Kleinere Korrekturen / Verbesserungen an `GenericApplicationTests`
  übernommen.

.editorconfig [new file with mode: 0644]
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/RecordHandler.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ApplicationIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java
src/test/java/de/juplo/kafka/TestRecordHandler.java [new file with mode: 0644]

diff --git a/.editorconfig b/.editorconfig
new file mode 100644 (file)
index 0000000..633c98a
--- /dev/null
@@ -0,0 +1,13 @@
+root = true
+
+[*]
+indent_style = space
+indent_size = tab
+tab_width = 2
+charset = utf-8
+end_of_line = lf
+trim_trailing_whitespace = true
+insert_final_newline = false
+
+[*.properties]
+charset = latin1
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6fd5d5f..e664a07 100644 (file)
--- a/pom.xml
+++ b/pom.xml
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
     </plugins>
   </build>
 
index 766740b..6bde5ff 100644 (file)
@@ -11,7 +11,6 @@ import org.springframework.context.annotation.Configuration;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Consumer;
 
 
 @Configuration
@@ -19,7 +18,7 @@ import java.util.function.Consumer;
 public class ApplicationConfiguration
 {
   @Bean
-  public Consumer<ConsumerRecord<String, Long>> consumer()
+  public RecordHandler<String, Long> recordHandler()
   {
     return (record) ->
     {
@@ -31,7 +30,7 @@ public class ApplicationConfiguration
   public EndlessConsumer<String, Long> endlessConsumer(
       KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
-      Consumer<ConsumerRecord<String, Long>> handler,
+      RecordHandler recordHandler,
       ApplicationProperties properties)
   {
     return
@@ -40,7 +39,7 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            handler);
+            recordHandler);
   }
 
   @Bean
index 8802df9..788a4a7 100644 (file)
@@ -2,7 +2,10 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -25,7 +28,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
+  private final RecordHandler handler;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java
new file mode 100644 (file)
index 0000000..327ac9f
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.function.Consumer;
+
+
+public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+{
+}
diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java
new file mode 100644 (file)
index 0000000..67b9d75
--- /dev/null
@@ -0,0 +1,42 @@
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import static de.juplo.kafka.ApplicationIT.TOPIC;
+
+
+@SpringBootTest(
+    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+    properties = {
+        "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+        "consumer.topic=" + TOPIC })
+@EmbeddedKafka(topics = TOPIC)
+@AutoConfigureDataMongo
+public class ApplicationIT
+{
+  public static final String TOPIC = "FOO";
+
+  @LocalServerPort
+  private int port;
+
+  @Autowired
+  private TestRestTemplate restTemplate;
+
+
+
+  @Test
+  public void testApplicationStartup()
+  {
+    restTemplate.getForObject(
+        "http://localhost:" + port + "/actuator/health",
+        String.class
+        )
+        .contains("UP");
+  }
+}
index 1e73040..1781b1d 100644 (file)
@@ -1,16 +1,13 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
 import org.springframework.test.context.ContextConfiguration;
 
-import java.util.Set;
 import java.util.function.Consumer;
 
 
@@ -73,9 +70,8 @@ public class ApplicationTests extends GenericApplicationTests<String, Long>
   @TestConfiguration
   public static class Configuration
   {
-    @Primary
     @Bean
-    public Consumer<ConsumerRecord<String, Long>> consumer()
+    public RecordHandler<String, Long> recordHandler()
     {
       return (record) ->
       {
index 649cdba..9465ce6 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -47,13 +48,17 @@ abstract class GenericApplicationTests<K, V>
 
 
        @Autowired
-       KafkaConsumer<K, V> kafkaConsumer;
+       org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
        @Autowired
        Consumer<ConsumerRecord<K, V>> consumer;
        @Autowired
-       ApplicationProperties properties;
+       ApplicationProperties applicationProperties;
        @Autowired
        ExecutorService executor;
+       @Autowired
+       ConsumerRebalanceListener rebalanceListener;
+       @Autowired
+       RecordHandler<K, V> recordHandler;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
@@ -76,7 +81,7 @@ abstract class GenericApplicationTests<K, V>
        /** Tests methods */
 
        @Test
-       void commitsCurrentOffsetsOnSuccess()
+       void commitsCurrentOffsetsOnSuccess() throws Exception
        {
                int numberOfGeneratedMessages =
                                recordGenerator.generate(false, false, messageSender);
@@ -99,6 +104,7 @@ abstract class GenericApplicationTests<K, V>
                                .isThrownBy(() -> endlessConsumer.exitStatus())
                                .describedAs("Consumer should still be running");
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -196,7 +202,7 @@ abstract class GenericApplicationTests<K, V>
                        Long expected = offsetsToCheck.get(tp) + 1;
                        log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
                        assertThat(offset)
-                                       .describedAs("Committed offset corresponds to the offset of the consumer")
+                                       .describedAs("Committed offset must be at most equal to the offset of the consumer")
                                        .isLessThanOrEqualTo(expected);
                        isOffsetBehindSeen.add(offset < expected);
                });
@@ -313,16 +319,16 @@ abstract class GenericApplicationTests<K, V>
        {
                Properties props;
                props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
                props.put("linger.ms", 100);
                props.put("key.serializer", BytesSerializer.class.getName());
                props.put("value.serializer", BytesSerializer.class.getName());
                testRecordProducer = new KafkaProducer<>(props);
 
                props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
+               props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
                props.put("client.id", "OFFSET-CONSUMER");
-               props.put("group.id", properties.getGroupId());
+               props.put("group.id", applicationProperties.getGroupId());
                props.put("key.deserializer", BytesDeserializer.class.getName());
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
@@ -339,21 +345,25 @@ abstract class GenericApplicationTests<K, V>
                        seenOffsets.put(tp, offset - 1);
                });
 
-               Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
-                               record ->
+               TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
+                               new TestRecordHandler<K, V>(recordHandler)
                                {
-                                       seenOffsets.put(
-                                                       new TopicPartition(record.topic(), record.partition()),
-                                                       record.offset());
-                                       receivedRecords.add(record);
-                                       consumer.accept(record);
+                                       @Override
+                                       public void onNewRecord(ConsumerRecord<K, V> record)
+                                       {
+                                               seenOffsets.put(
+                                                               new TopicPartition(record.topic(), record.partition()),
+                                                               record.offset());
+                                               receivedRecords.add(record);
+                                               consumer.accept(record);
+                                       }
                                };
 
                endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
-                                               properties.getClientId(),
-                                               properties.getTopic(),
+                                               applicationProperties.getClientId(),
+                                               applicationProperties.getTopic(),
                                                kafkaConsumer,
                                                captureOffsetAndExecuteTestHandler);
 
@@ -365,7 +375,6 @@ abstract class GenericApplicationTests<K, V>
        {
                try
                {
-                       endlessConsumer.stop();
                        testRecordProducer.close();
                        offsetConsumer.close();
                }
diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java
new file mode 100644 (file)
index 0000000..b4efdd6
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+
+@RequiredArgsConstructor
+public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
+{
+  private final RecordHandler<K, V> handler;
+
+
+  public abstract void onNewRecord(ConsumerRecord<K, V> record);
+
+
+  @Override
+  public void accept(ConsumerRecord<K, V> record)
+  {
+    this.onNewRecord(record);
+    handler.accept(record);
+  }
+}