counter: 1.2.8 - Reconfigured tests to send data as domain-instances counter-1.2.8
authorKai Moritz <kai@juplo.de>
Sun, 12 May 2024 16:06:09 +0000 (18:06 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 20:32:57 +0000 (22:32 +0200)
pom.xml
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java

diff --git a/pom.xml b/pom.xml
index 22b9da0..0b134f3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>counter</artifactId>
-       <version>1.2.7</version>
+       <version>1.2.8</version>
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
index b412fe4..559b171 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.counter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeEach;
@@ -13,12 +14,18 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
 import static org.awaitility.Awaitility.*;
@@ -41,7 +48,7 @@ public class CounterApplicationIT
        static final int PARTITIONS = 2;
 
        @Autowired
-       KafkaTemplate<String, String> kafkaTemplate;
+       KafkaTemplate<String, Word> kafkaTemplate;
        @Autowired
        Consumer consumer;
 
@@ -85,6 +92,28 @@ public class CounterApplicationIT
        @TestConfiguration
        static class Configuration
        {
+               @Bean
+               ProducerFactory<?, ?> producerFactory(Properties streamProcessorProperties)
+               {
+                       Map<String, Object> propertyMap = streamProcessorProperties
+                                       .entrySet()
+                                       .stream()
+                                       .collect(
+                                                       Collectors.toMap(
+                                                                       entry -> (String)entry.getKey(),
+                                                                       entry -> entry.getValue()
+                                                       ));
+
+                       propertyMap.put(
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                       JsonSerializer.class.getName());
+                       propertyMap.put(
+                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                       JsonSerializer.class.getName());
+
+                       return new DefaultKafkaProducerFactory<>(propertyMap);
+               }
+
                @Bean
                Consumer consumer()
                {
index fe85dc4..902e93f 100644 (file)
@@ -8,6 +8,7 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.List;
 import java.util.Properties;
@@ -33,10 +34,10 @@ public class CounterStreamProcessorTopologyTest
 
     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
 
-    TestInputTopic<String, String> in = testDriver.createInputTopic(
+    TestInputTopic<String, Word> in = testDriver.createInputTopic(
         IN,
-        new StringSerializer(),
-        new StringSerializer());
+        new JsonSerializer<>(),
+        new JsonSerializer<>());
 
     TestOutputTopic<String, String> out = testDriver.createOutputTopic(
         OUT,
index 8ff7022..f795e77 100644 (file)
@@ -8,41 +8,41 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-       static void writeInputData(BiConsumer<String, String> consumer)
+       static void writeInputData(BiConsumer<String, Word> consumer)
        {
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Hallo\"}");
+                               Word.of("peter","Hallo"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+                               Word.of("klaus","Müsch"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Welt\"}");
+                               Word.of("peter","Welt"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"Müsch\"}");
+                               Word.of("klaus","Müsch"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Welt\"}");
+                               Word.of("peter","Welt"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"));
                consumer.accept(
                                "peter",
-                               "{\"user\":\"peter\",\"word\":\"Boäh\"}");
+                               Word.of("peter","Boäh"));
                consumer.accept(
                                "klaus",
-                               "{\"user\":\"klaus\",\"word\":\"s\"}");
+                               Word.of("klaus","s"));
        }
 
        static void assertExpectedResult(List<Message> receivedMessages)